diff --git a/queryable/parquet_queryable.go b/queryable/parquet_queryable.go index 3bcd8bc..c4b2996 100644 --- a/queryable/parquet_queryable.go +++ b/queryable/parquet_queryable.go @@ -34,11 +34,19 @@ import ( type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int + concurrency int + rowCountLimitFunc search.QuotaLimitFunc + chunkBytesLimitFunc search.QuotaLimitFunc + dataBytesLimitFunc search.QuotaLimitFunc + materializedSeriesCallback search.MaterializedSeriesFunc } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: search.NoopMaterializedSeriesFunc, } type QueryableOpts func(*queryableOpts) @@ -50,6 +58,35 @@ func WithConcurrency(concurrency int) QueryableOpts { } } +// WithRowCountLimitFunc sets a callback function to get limit for matched row count. +func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.rowCountLimitFunc = fn + } +} + +// WithChunkBytesLimitFunc sets a callback function to get limit for chunk column page bytes fetched. +func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.chunkBytesLimitFunc = fn + } +} + +// WithDataBytesLimitFunc sets a callback function to get limit for data (including label and chunk) +// column page bytes fetched. +func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.dataBytesLimitFunc = fn + } +} + +// WithMaterializedSeriesCallback sets a callback function to process the materialized series. +func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.materializedSeriesCallback = fn + } +} + type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -191,8 +228,11 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( return nil, err } qBlocks := make([]*queryableShard, len(shards)) + rowCountQuota := search.NewQuota(p.opts.rowCountLimitFunc(ctx)) + chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx)) + dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx)) for i, shard := range shards { - qb, err := newQueryableShard(p.opts, shard, p.d) + qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota) if err != nil { return nil, err } @@ -207,12 +247,12 @@ type queryableShard struct { concurrency int } -func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { +func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) { s, err := block.TSDBSchema() if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback) if err != nil { return nil, err } diff --git a/queryable/parquet_queryable_test.go b/queryable/parquet_queryable_test.go index ffa7dd7..755f8ff 100644 --- a/queryable/parquet_queryable_test.go +++ b/queryable/parquet_queryable_test.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus-community/parquet-common/util" ) @@ -269,6 +270,126 @@ func TestQueryable(t *testing.T) { require.Equal(t, expectedLabelValues, lValues) }) }) + + t.Run("RowCountQuota", func(t *testing.T) { + // Test with limited row count quota + limitedRowQuota := func(ctx context.Context) int64 { + return 10 // Only allow 10 rows + } + queryable, err := createQueryable(shard, WithRowCountLimitFunc(limitedRowQuota)) + require.NoError(t, err) + querier, err := queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + // Try to query more rows than quota allows + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")} + ss := querier.Select(ctx, true, nil, matchers...) + + // This should fail due to row count quota + for ss.Next() { + _ = ss.At() + } + require.Error(t, ss.Err()) + require.Contains(t, ss.Err().Error(), "would fetch too many rows") + require.True(t, search.IsResourceExhausted(ss.Err())) + + // Test with sufficient quota + sufficientRowQuota := func(ctx context.Context) int64 { + return 1000 // Allow 1000 rows + } + queryable, err = createQueryable(shard, WithRowCountLimitFunc(sufficientRowQuota)) + require.NoError(t, err) + querier, err = queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + ss = querier.Select(ctx, true, nil, matchers...) + var series []prom_storage.Series + for ss.Next() { + series = append(series, ss.At()) + } + require.NoError(t, ss.Err()) + require.NotEmpty(t, series) + }) + + t.Run("ChunkBytesQuota", func(t *testing.T) { + // Test with limited chunk bytes quota + limitedChunkQuota := func(ctx context.Context) int64 { + return 100 // Only allow 100 bytes + } + queryable, err := createQueryable(shard, WithChunkBytesLimitFunc(limitedChunkQuota)) + require.NoError(t, err) + querier, err := queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + // Try to query chunks that exceed the quota + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")} + ss := querier.Select(ctx, true, nil, matchers...) + + // This should fail due to chunk bytes quota + for ss.Next() { + _ = ss.At() + } + require.Error(t, ss.Err()) + require.Contains(t, ss.Err().Error(), "would fetch too many chunk bytes") + require.True(t, search.IsResourceExhausted(ss.Err())) + + // Test with sufficient quota + sufficientChunkQuota := func(ctx context.Context) int64 { + return 1000000 // Allow 1MB + } + queryable, err = createQueryable(shard, WithChunkBytesLimitFunc(sufficientChunkQuota)) + require.NoError(t, err) + querier, err = queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + ss = querier.Select(ctx, true, nil, matchers...) + var series []prom_storage.Series + for ss.Next() { + series = append(series, ss.At()) + } + require.NoError(t, ss.Err()) + require.NotEmpty(t, series) + }) + + t.Run("DataBytesQuota", func(t *testing.T) { + // Test with limited data bytes quota + limitedDataQuota := func(ctx context.Context) int64 { + return 100 // Only allow 100 bytes + } + queryable, err := createQueryable(shard, WithDataBytesLimitFunc(limitedDataQuota)) + require.NoError(t, err) + querier, err := queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + // Try to query data that exceeds the quota + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "unique_0")} + ss := querier.Select(ctx, true, nil, matchers...) + + // This should fail due to data bytes quota + for ss.Next() { + _ = ss.At() + } + require.Error(t, ss.Err()) + require.Contains(t, ss.Err().Error(), "would fetch too many data bytes") + require.True(t, search.IsResourceExhausted(ss.Err())) + + // Test with sufficient quota + sufficientDataQuota := func(ctx context.Context) int64 { + return 1000000 // Allow 1MB + } + queryable, err = createQueryable(shard, WithDataBytesLimitFunc(sufficientDataQuota)) + require.NoError(t, err) + querier, err = queryable.Querier(data.MinTime, data.MaxTime) + require.NoError(t, err) + + ss = querier.Select(ctx, true, nil, matchers...) + var series []prom_storage.Series + for ss.Next() { + series = append(series, ss.At()) + } + require.NoError(t, ss.Err()) + require.NotEmpty(t, series) + }) }) } } @@ -338,11 +459,11 @@ func queryWithQueryable(t *testing.T, mint, maxt int64, shard storage.ParquetSha return found } -func createQueryable(shard storage.ParquetShard) (prom_storage.Queryable, error) { +func createQueryable(shard storage.ParquetShard, opts ...QueryableOpts) (prom_storage.Queryable, error) { d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) return NewParquetQueryable(d, func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) { return []storage.ParquetShard{shard}, nil - }) + }, opts...) } var benchmarkCases = []struct { diff --git a/search/limits.go b/search/limits.go new file mode 100644 index 0000000..dbb96aa --- /dev/null +++ b/search/limits.go @@ -0,0 +1,82 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) The Thanos Authors. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// This package is a modified copy from +// https://github.com/thanos-io/thanos-parquet-gateway/blob/cfc1279f605d1c629c4afe8b1e2a340e8b15ecdc/internal/limits/limit.go. + +package search + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type resourceExhausted struct { + used int64 +} + +func (re *resourceExhausted) Error() string { + return fmt.Sprintf("resource exhausted (used %d)", re.used) +} + +// IsResourceExhausted checks if the error is a resource exhausted error. +func IsResourceExhausted(err error) bool { + var re *resourceExhausted + return errors.As(err, &re) +} + +// Quota is a limiter for a resource. +type Quota struct { + mu sync.Mutex + q int64 + u int64 +} + +// NewQuota creates a new quota with the given limit. +func NewQuota(n int64) *Quota { + return &Quota{q: n, u: n} +} + +// UnlimitedQuota creates a new quota with no limit. +func UnlimitedQuota() *Quota { + return NewQuota(0) +} + +func (q *Quota) Reserve(n int64) error { + if q.q == 0 { + return nil + } + + q.mu.Lock() + defer q.mu.Unlock() + + if q.u-n < 0 { + return &resourceExhausted{used: q.q} + } + q.u -= n + return nil +} + +// QuotaLimitFunc is a function that returns the limit value. +type QuotaLimitFunc func(ctx context.Context) int64 + +// NoopQuotaLimitFunc returns 0 which means no limit. +func NoopQuotaLimitFunc(ctx context.Context) int64 { + return 0 +} diff --git a/search/limits_test.go b/search/limits_test.go new file mode 100644 index 0000000..9a69e58 --- /dev/null +++ b/search/limits_test.go @@ -0,0 +1,148 @@ +package search + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsResourceExhausted(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "resource exhausted error", + err: &resourceExhausted{used: 100}, + expected: true, + }, + { + name: "wrapped resource exhausted error", + err: fmt.Errorf("wrapped: %w", &resourceExhausted{used: 50}), + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsResourceExhausted(tt.err) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestQuota_Reserve(t *testing.T) { + tests := []struct { + name string + quotaLimit int64 + reservations []int64 + expectError bool + }{ + { + name: "reserve within limit", + quotaLimit: 100, + reservations: []int64{30, 40, 20}, + expectError: false, + }, + { + name: "reserve exact limit", + quotaLimit: 100, + reservations: []int64{100}, + expectError: false, + }, + { + name: "reserve beyond limit", + quotaLimit: 100, + reservations: []int64{50, 60}, + expectError: true, + }, + { + name: "reserve zero amount", + quotaLimit: 100, + reservations: []int64{0}, + expectError: false, + }, + { + name: "reserve negative amount", + quotaLimit: 100, + reservations: []int64{-10}, + expectError: false, // This should work as it increases available quota + }, + { + name: "unlimited quota", + quotaLimit: 0, + reservations: []int64{1000, 2000, 3000}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + quota := NewQuota(tt.quotaLimit) + var lastErr error + + for _, amount := range tt.reservations { + err := quota.Reserve(amount) + if err != nil { + lastErr = err + require.True(t, IsResourceExhausted(err), "Expected resource exhausted error") + break + } + } + + if tt.expectError { + require.Error(t, lastErr, "Expected error but got none") + } else { + require.NoError(t, lastErr, "Unexpected error") + } + }) + } +} + +func TestQuota_ConcurrentReserve(t *testing.T) { + quota := NewQuota(1000) + const numGoroutines = 10 + const reservationAmount = 100 + + // Use a channel to collect errors from goroutines + errChan := make(chan error, numGoroutines) + + // Start multiple goroutines trying to reserve simultaneously + for i := 0; i < numGoroutines; i++ { + go func() { + err := quota.Reserve(reservationAmount) + errChan <- err + }() + } + + // Collect all errors + var errors []error + for i := 0; i < numGoroutines; i++ { + err := <-errChan + errors = append(errors, err) + } + + // Should have exactly 10 successful reservations (1000 total) + successCount := 0 + exhaustedCount := 0 + for _, err := range errors { + if err == nil { + successCount++ + } else if IsResourceExhausted(err) { + exhaustedCount++ + } else { + require.Fail(t, "Unexpected error type", "error: %v", err) + } + } + + expectedSuccess := 10 // 1000 / 100 = 10 + require.Equal(t, expectedSuccess, successCount, "Expected %d successful reservations", expectedSuccess) + require.Equal(t, 0, exhaustedCount, "Expected 0 exhausted errors") + + // Next reservation should fail + err := quota.Reserve(1) + require.Error(t, err, "Expected error when reserving beyond quota limit") + require.True(t, IsResourceExhausted(err), "Expected resource exhausted error") +} diff --git a/search/materialize.go b/search/materialize.go index 5153818..2f48550 100644 --- a/search/materialize.go +++ b/search/materialize.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "iter" "maps" "slices" "sync" @@ -43,12 +44,31 @@ type Materializer struct { concurrency int dataColToIndex []int + + rowCountQuota *Quota + chunkBytesQuota *Quota + dataBytesQuota *Quota + + materializedSeriesCallback MaterializedSeriesFunc +} + +// MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for +// materialized series. +type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error + +// NoopMaterializedSeriesFunc is a noop callback function that does nothing. +func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) error { + return nil } func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, concurrency int, + rowCountQuota *Quota, + chunkBytesQuota *Quota, + dataBytesQuota *Quota, + materializeSeriesCallback MaterializedSeriesFunc, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -66,19 +86,26 @@ func NewMaterializer(s *schema.TSDBSchema, } return &Materializer{ - s: s, - d: d, - b: block, - colIdx: colIdx.ColumnIndex, - concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), - dataColToIndex: dataColToIndex, + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: concurrency, + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + dataColToIndex: dataColToIndex, + rowCountQuota: rowCountQuota, + chunkBytesQuota: chunkBytesQuota, + dataBytesQuota: dataBytesQuota, + materializedSeriesCallback: materializeSeriesCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { + if err := m.checkRowCountQuota(rr); err != nil { + return nil, err + } sLbls, err := m.materializeAllLabels(ctx, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") @@ -106,6 +133,10 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 return len(cs.(*concreteChunksSeries).chks) == 0 }) } + + if err := m.materializedSeriesCallback(ctx, results); err != nil { + return nil, err + } return results, err } @@ -125,7 +156,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string { func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -164,7 +195,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -208,7 +239,7 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -232,7 +263,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R for cIdx, v := range colsMap { errGroup.Go(func() error { cc := labelsRg.ColumnChunks()[cIdx] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return errors.Wrap(err, "failed to materialize labels values") } @@ -279,7 +310,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max r := make([][]chunks.Meta, totalRows(rr)) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { - values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr) + values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) if err != nil { return r, err } @@ -296,7 +327,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max return r, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { +func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { if len(rr) == 0 { return nil, nil } @@ -331,6 +362,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq } } } + if err := m.checkBytesQuota(maps.Keys(pagesToRowsMap), oidx, chunkColumn); err != nil { + return nil, err + } pageRanges := m.coalescePageRanges(pagesToRowsMap, oidx) @@ -464,6 +498,34 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa return r } +func (m *Materializer) checkRowCountQuota(rr []RowRange) error { + if err := m.rowCountQuota.Reserve(totalRows(rr)); err != nil { + return fmt.Errorf("would fetch too many rows: %w", err) + } + return nil +} + +func (m *Materializer) checkBytesQuota(pages iter.Seq[int], oidx parquet.OffsetIndex, chunkColumn bool) error { + total := totalBytes(pages, oidx) + if chunkColumn { + if err := m.chunkBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many chunk bytes: %w", err) + } + } + if err := m.dataBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many data bytes: %w", err) + } + return nil +} + +func totalBytes(pages iter.Seq[int], oidx parquet.OffsetIndex) int64 { + res := int64(0) + for i := range pages { + res += oidx.CompressedPageSize(i) + } + return res +} + type valuesIterator struct { p parquet.Page diff --git a/search/materialize_test.go b/search/materialize_test.go index e46d9d7..6240379 100644 --- a/search/materialize_test.go +++ b/search/materialize_test.go @@ -110,7 +110,7 @@ func TestMaterializeE2E(t *testing.T) { s, err := shard.TSDBSchema() require.NoError(t, err) d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) - m, err := NewMaterializer(s, d, shard, 10) + m, err := NewMaterializer(s, d, shard, 10, UnlimitedQuota(), UnlimitedQuota(), UnlimitedQuota(), NoopMaterializedSeriesFunc) require.NoError(t, err) rr := []RowRange{{from: int64(0), count: shard.LabelsFile().RowGroups()[0].NumRows()}} ctx, cancel := context.WithCancel(ctx) @@ -128,12 +128,96 @@ func TestMaterializeE2E(t *testing.T) { s, err := shard.TSDBSchema() require.NoError(t, err) d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) - m, err := NewMaterializer(s, d, shard, 10) + m, err := NewMaterializer(s, d, shard, 10, UnlimitedQuota(), UnlimitedQuota(), UnlimitedQuota(), NoopMaterializedSeriesFunc) require.NoError(t, err) rr := []RowRange{{from: int64(0), count: shard.LabelsFile().RowGroups()[0].NumRows()}} _, err = m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) require.NoError(t, err) }) + + t.Run("RowCountQuota", func(t *testing.T) { + s, err := shard.TSDBSchema() + require.NoError(t, err) + d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + + // Test with limited row count quota + limitedRowCountQuota := NewQuota(10) // Only allow 10 rows + m, err := NewMaterializer(s, d, shard, 10, limitedRowCountQuota, UnlimitedQuota(), UnlimitedQuota(), NoopMaterializedSeriesFunc) + require.NoError(t, err) + + // Try to materialize more rows than quota allows + rr := []RowRange{{from: int64(0), count: 20}} // 20 rows + _, err = m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.Error(t, err) + require.Contains(t, err.Error(), "would fetch too many rows") + require.True(t, IsResourceExhausted(err)) + + // Test with sufficient quota + sufficientRowCountQuota := NewQuota(1000) // Allow 1000 rows + m, err = NewMaterializer(s, d, shard, 10, sufficientRowCountQuota, UnlimitedQuota(), UnlimitedQuota(), NoopMaterializedSeriesFunc) + require.NoError(t, err) + + rr = []RowRange{{from: int64(0), count: 50}} // 50 rows + series, err := m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.NoError(t, err) + require.NotEmpty(t, series) + }) + + t.Run("ChunkBytesQuota", func(t *testing.T) { + s, err := shard.TSDBSchema() + require.NoError(t, err) + d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + + // Test with limited chunk bytes quota + limitedChunkBytesQuota := NewQuota(100) // Only allow 100 bytes + m, err := NewMaterializer(s, d, shard, 10, UnlimitedQuota(), limitedChunkBytesQuota, UnlimitedQuota(), NoopMaterializedSeriesFunc) + require.NoError(t, err) + + // Try to materialize chunks that exceed the quota + rr := []RowRange{{from: int64(0), count: 100}} // Large range to trigger chunk reading + _, err = m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.Error(t, err) + require.Contains(t, err.Error(), "would fetch too many chunk bytes") + require.True(t, IsResourceExhausted(err)) + + // Test with sufficient quota + sufficientChunkBytesQuota := NewQuota(1000000) // Allow 1MB + m, err = NewMaterializer(s, d, shard, 10, UnlimitedQuota(), sufficientChunkBytesQuota, UnlimitedQuota(), NoopMaterializedSeriesFunc) + require.NoError(t, err) + + rr = []RowRange{{from: int64(0), count: 10}} // Small range + series, err := m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.NoError(t, err) + require.NotEmpty(t, series) + }) + + t.Run("DataBytesQuota", func(t *testing.T) { + s, err := shard.TSDBSchema() + require.NoError(t, err) + d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + + // Test with limited data bytes quota + limitedDataBytesQuota := NewQuota(100) // Only allow 100 bytes + m, err := NewMaterializer(s, d, shard, 10, UnlimitedQuota(), UnlimitedQuota(), limitedDataBytesQuota, NoopMaterializedSeriesFunc) + require.NoError(t, err) + + // Try to materialize data that exceeds the quota + rr := []RowRange{{from: int64(0), count: 100}} // Large range to trigger data reading + _, err = m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.Error(t, err) + require.Contains(t, err.Error(), "would fetch too many data bytes") + require.True(t, IsResourceExhausted(err)) + + // Test with sufficient quota + sufficientDataBytesQuota := NewQuota(1000000) // Allow 1MB + m, err = NewMaterializer(s, d, shard, 10, UnlimitedQuota(), UnlimitedQuota(), sufficientDataBytesQuota, NoopMaterializedSeriesFunc) + require.NoError(t, err) + + rr = []RowRange{{from: int64(0), count: 10}} // Small range + series, err := m.Materialize(ctx, 0, data.MinTime, data.MaxTime, false, rr) + require.NoError(t, err) + require.NotEmpty(t, series) + }) } func convertToParquet(t *testing.T, ctx context.Context, bkt *filesystem.Bucket, data util.TestData, h convert.Convertible, opts ...storage.FileOption) storage.ParquetShard { @@ -170,7 +254,7 @@ func query(t *testing.T, mint, maxt int64, shard storage.ParquetShard, constrain s, err := shard.TSDBSchema() require.NoError(t, err) d := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) - m, err := NewMaterializer(s, d, shard, 10) + m, err := NewMaterializer(s, d, shard, 10, UnlimitedQuota(), UnlimitedQuota(), UnlimitedQuota(), NoopMaterializedSeriesFunc) require.NoError(t, err) found := make([]prom_storage.ChunkSeries, 0, 100)