Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions queryable/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ import (
type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error)

type queryableOpts struct {
concurrency int
concurrency int
rowCountLimitFunc search.QuotaLimitFunc
chunkBytesLimitFunc search.QuotaLimitFunc
dataBytesLimitFunc search.QuotaLimitFunc
materializedSeriesCallback search.MaterializedSeriesFunc
}

var DefaultQueryableOpts = queryableOpts{
concurrency: runtime.GOMAXPROCS(0),
concurrency: runtime.GOMAXPROCS(0),
rowCountLimitFunc: search.NoopQuotaLimitFunc,
chunkBytesLimitFunc: search.NoopQuotaLimitFunc,
dataBytesLimitFunc: search.NoopQuotaLimitFunc,
materializedSeriesCallback: func(_ context.Context, _ []prom_storage.ChunkSeries) error {
return nil
},
}

type QueryableOpts func(*queryableOpts)
Expand All @@ -50,6 +60,30 @@ func WithConcurrency(concurrency int) QueryableOpts {
}
}

func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
return func(opts *queryableOpts) {
opts.rowCountLimitFunc = fn
}
}

func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
return func(opts *queryableOpts) {
opts.chunkBytesLimitFunc = fn
}
}

func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts {
return func(opts *queryableOpts) {
opts.dataBytesLimitFunc = fn
}
}

func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts {
return func(opts *queryableOpts) {
opts.materializedSeriesCallback = fn
}
}

type parquetQueryable struct {
shardsFinder ShardsFinderFunction
d *schema.PrometheusParquetChunksDecoder
Expand Down Expand Up @@ -191,8 +225,11 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) (
return nil, err
}
qBlocks := make([]*queryableShard, len(shards))
rowCountQuota := search.NewQuota(p.opts.rowCountLimitFunc(ctx))
chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx))
dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx))
for i, shard := range shards {
qb, err := newQueryableShard(p.opts, shard, p.d)
qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota)
if err != nil {
return nil, err
}
Expand All @@ -207,12 +244,12 @@ type queryableShard struct {
concurrency int
}

func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) {
func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) {
s, err := block.TSDBSchema()
if err != nil {
return nil, err
}
m, err := search.NewMaterializer(s, d, block, opts.concurrency)
m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback)
if err != nil {
return nil, err
}
Expand Down
76 changes: 76 additions & 0 deletions search/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright (c) The Thanos Authors.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// This package is a modified copy from
// https://github.com/thanos-io/thanos-parquet-gateway/blob/cfc1279f605d1c629c4afe8b1e2a340e8b15ecdc/internal/limits/limit.go.

package search

import (
"context"
"errors"
"fmt"
"sync"
)

type resourceExhausted struct {
used int64
}

func (re *resourceExhausted) Error() string {
return fmt.Sprintf("resource exhausted (used %d)", re.used)
}

func IsResourceExhausted(err error) bool {
var re *resourceExhausted
return errors.As(err, &re)
}

type Quota struct {
mu sync.Mutex
q int64
u int64
}

func NewQuota(n int64) *Quota {
return &Quota{q: n, u: n}
}

func UnlimitedQuota() *Quota {
return NewQuota(0)
}

func (q *Quota) Reserve(n int64) error {
if q.q == 0 {
return nil
}

q.mu.Lock()
defer q.mu.Unlock()

if q.u-n < 0 {
return &resourceExhausted{used: q.q}
}
q.u -= n
return nil
}

type QuotaLimitFunc func(ctx context.Context) int64

func NoopQuotaLimitFunc(ctx context.Context) int64 {
return 0
}
83 changes: 70 additions & 13 deletions search/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"iter"
"maps"
"slices"
"sync"
Expand All @@ -43,12 +44,26 @@ type Materializer struct {
concurrency int

dataColToIndex []int

rowCountQuota *Quota
chunkBytesQuota *Quota
dataBytesQuota *Quota

materializedSeriesCallback MaterializedSeriesFunc
}

// MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for
// materialized series.
type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error

func NewMaterializer(s *schema.TSDBSchema,
d *schema.PrometheusParquetChunksDecoder,
block storage.ParquetShard,
concurrency int,
rowCountQuota *Quota,
chunkBytesQuota *Quota,
dataBytesQuota *Quota,
materializeSeriesCallback MaterializedSeriesFunc,
) (*Materializer, error) {
colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes)
if !ok {
Expand All @@ -66,19 +81,26 @@ func NewMaterializer(s *schema.TSDBSchema,
}

return &Materializer{
s: s,
d: d,
b: block,
colIdx: colIdx.ColumnIndex,
concurrency: concurrency,
partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize),
dataColToIndex: dataColToIndex,
s: s,
d: d,
b: block,
colIdx: colIdx.ColumnIndex,
concurrency: concurrency,
partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize),
dataColToIndex: dataColToIndex,
rowCountQuota: rowCountQuota,
chunkBytesQuota: chunkBytesQuota,
dataBytesQuota: dataBytesQuota,
materializedSeriesCallback: materializeSeriesCallback,
}, nil
}

// Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr).
// It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series.
func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) {
if err := m.checkRowCountQuota(rr); err != nil {
return nil, err
}
sLbls, err := m.materializeAllLabels(ctx, rgi, rr)
if err != nil {
return nil, errors.Wrapf(err, "error materializing labels")
Expand Down Expand Up @@ -106,6 +128,10 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6
return len(cs.(*concreteChunksSeries).chks) == 0
})
}

if err := m.materializedSeriesCallback(ctx, results); err != nil {
return nil, err
}
return results, err
}

Expand All @@ -125,7 +151,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string {
func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) {
labelsRg := m.b.LabelsFile().RowGroups()[rgi]
cc := labelsRg.ColumnChunks()[m.colIdx]
colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr)
colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false)
if err != nil {
return nil, errors.Wrap(err, "materializer failed to materialize columns")
}
Expand Down Expand Up @@ -164,7 +190,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string,
return []string{}, nil
}
cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex]
values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr)
values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false)
if err != nil {
return nil, errors.Wrap(err, "materializer failed to materialize columns")
}
Expand Down Expand Up @@ -208,7 +234,7 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin
func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) {
labelsRg := m.b.LabelsFile().RowGroups()[rgi]
cc := labelsRg.ColumnChunks()[m.colIdx]
colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr)
colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false)
if err != nil {
return nil, errors.Wrap(err, "materializer failed to materialize columns")
}
Expand All @@ -232,7 +258,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R
for cIdx, v := range colsMap {
errGroup.Go(func() error {
cc := labelsRg.ColumnChunks()[cIdx]
values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr)
values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false)
if err != nil {
return errors.Wrap(err, "failed to materialize labels values")
}
Expand Down Expand Up @@ -279,7 +305,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max
r := make([][]chunks.Meta, totalRows(rr))

for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ {
values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr)
values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true)
if err != nil {
return r, err
}
Expand All @@ -296,7 +322,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max
return r, nil
}

func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) {
func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) {
if len(rr) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -331,6 +357,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq
}
}
}
if err := m.checkBytesQuota(maps.Keys(pagesToRowsMap), oidx, chunkColumn); err != nil {
return nil, err
}

pageRanges := m.coalescePageRanges(pagesToRowsMap, oidx)

Expand Down Expand Up @@ -464,6 +493,34 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa
return r
}

func (m *Materializer) checkRowCountQuota(rr []RowRange) error {
if err := m.rowCountQuota.Reserve(totalRows(rr)); err != nil {
return fmt.Errorf("would fetch too many rows: %w", err)
}
return nil
}

func (m *Materializer) checkBytesQuota(pages iter.Seq[int], oidx parquet.OffsetIndex, chunkColumn bool) error {
total := totalBytes(pages, oidx)
if chunkColumn {
if err := m.chunkBytesQuota.Reserve(total); err != nil {
return fmt.Errorf("would fetch too many chunk bytes: %w", err)
}
}
if err := m.dataBytesQuota.Reserve(total); err != nil {
return fmt.Errorf("would fetch too many data bytes: %w", err)
}
return nil
}

func totalBytes(pages iter.Seq[int], oidx parquet.OffsetIndex) int64 {
res := int64(0)
for i := range pages {
res += oidx.CompressedPageSize(i)
}
return res
}

type valuesIterator struct {
p parquet.Page

Expand Down
Loading