Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
)
Expand Down Expand Up @@ -98,6 +99,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
Expand Down Expand Up @@ -153,6 +155,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
Expand Down
60 changes: 50 additions & 10 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,39 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

type blockStorageType struct{}

var blockStorageKey = blockStorageType{}

const BlockStoreTypeHeader = "X-Cortex-BlockStore-Type"

type blockStoreType string

const (
tsdbBlockStore blockStoreType = "tsdb"
parquetBlockStore blockStoreType = "parquet"
)

var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}

// AddBlockStoreTypeToContext checks HTTP header and set block store key to context if
// relevant header is set.
func AddBlockStoreTypeToContext(ctx context.Context, storeType string) context.Context {
ng := blockStoreType(storeType)
switch ng {
case tsdbBlockStore, parquetBlockStore:
return context.WithValue(ctx, blockStorageKey, ng)
}
return ctx
}

func getBlockStoreType(ctx context.Context, defaultBlockStoreType blockStoreType) blockStoreType {
if ng, ok := ctx.Value(blockStorageKey).(blockStoreType); ok {
return ng
}
return defaultBlockStoreType
}

type parquetQueryableFallbackMetrics struct {
blocksQueriedTotal *prometheus.CounterVec
selectCount *prometheus.CounterVec
Expand Down Expand Up @@ -69,6 +102,8 @@ type parquetQueryableWithFallback struct {

limits *validation.Overrides
logger log.Logger

defaultBlockStoreType blockStoreType
}

func NewParquetQueryable(
Expand Down Expand Up @@ -153,6 +188,7 @@ func NewParquetQueryable(
metrics: newParquetQueryableFallbackMetrics(reg),
limits: limits,
logger: logger,
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
}

p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
Expand Down Expand Up @@ -195,15 +231,16 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
}

return &parquetQuerierWithFallback{
minT: mint,
maxT: maxt,
parquetQuerier: pq,
queryStoreAfter: p.queryStoreAfter,
blocksStoreQuerier: bsq,
finder: p.finder,
metrics: p.metrics,
limits: p.limits,
logger: p.logger,
minT: mint,
maxT: maxt,
parquetQuerier: pq,
queryStoreAfter: p.queryStoreAfter,
blocksStoreQuerier: bsq,
finder: p.finder,
metrics: p.metrics,
limits: p.limits,
logger: p.logger,
defaultBlockStoreType: p.defaultBlockStoreType,
}, nil
}

Expand All @@ -224,6 +261,8 @@ type parquetQuerierWithFallback struct {

limits *validation.Overrides
logger log.Logger

defaultBlockStoreType blockStoreType
}

func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
Expand Down Expand Up @@ -394,10 +433,11 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
return nil, nil, err
}

useParquet := getBlockStoreType(ctx, q.defaultBlockStoreType) == parquetBlockStore
parquetBlocks := make([]*bucketindex.Block, 0, len(blocks))
remaining := make([]*bucketindex.Block, 0, len(blocks))
for _, b := range blocks {
if b.Parquet != nil {
if useParquet && b.Parquet != nil {
parquetBlocks = append(parquetBlocks, b)
continue
}
Expand Down
168 changes: 136 additions & 32 deletions pkg/querier/parquet_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 4),
logger: log.NewNopLogger(),
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 4),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
Expand Down Expand Up @@ -118,14 +119,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
Expand Down Expand Up @@ -178,14 +180,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
Expand Down Expand Up @@ -244,14 +247,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
Expand Down Expand Up @@ -291,6 +295,106 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
})
})

t.Run("Default query TSDB block store even if parquet blocks available. Override with ctx", func(t *testing.T) {
finder := &blocksFinderMock{}
stores := createStore()

q := &blocksStoreQuerier{
minT: minT,
maxT: maxT,
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
limits: &blocksStoreLimitsMock{},

storeGatewayConsistencyCheckMaxAttempts: 3,
}

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: tsdbBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

t.Run("select", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
ss := pq.Select(ctx, true, nil, matchers...)
require.NoError(t, ss.Err())
require.Len(t, stores.queriedBlocks, 2)
require.Len(t, mParquetQuerier.queriedBlocks, 0)
})

t.Run("select with ctx key override to parquet", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
ss := pq.Select(newCtx, true, nil, matchers...)
require.NoError(t, ss.Err())
require.Len(t, stores.queriedBlocks, 0)
require.Len(t, mParquetQuerier.queriedBlocks, 2)
})

t.Run("labelNames", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
r, _, err := pq.LabelNames(ctx, nil, matchers...)
require.NoError(t, err)
require.Len(t, stores.queriedBlocks, 2)
require.Len(t, mParquetQuerier.queriedBlocks, 0)
require.Contains(t, r, "fromSg")
require.NotContains(t, r, "fromParquet")
})

t.Run("labelNames with ctx key override to parquet", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
r, _, err := pq.LabelNames(newCtx, nil, matchers...)
require.NoError(t, err)
require.Len(t, stores.queriedBlocks, 0)
require.Len(t, mParquetQuerier.queriedBlocks, 2)
require.NotContains(t, r, "fromSg")
require.Contains(t, r, "fromParquet")
})

t.Run("labelValues", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
r, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
require.NoError(t, err)
require.Len(t, stores.queriedBlocks, 2)
require.Len(t, mParquetQuerier.queriedBlocks, 0)
require.Contains(t, r, "fromSg")
require.NotContains(t, r, "fromParquet")
})

t.Run("labelValues with ctx key override to parquet", func(t *testing.T) {
stores.Reset()
mParquetQuerier.Reset()
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
r, _, err := pq.LabelValues(newCtx, labels.MetricName, nil, matchers...)
require.NoError(t, err)
require.Len(t, stores.queriedBlocks, 0)
require.Len(t, mParquetQuerier.queriedBlocks, 2)
require.NotContains(t, r, "fromSg")
require.Contains(t, r, "fromParquet")
})
})
}

func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides {
Expand Down
14 changes: 12 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -93,8 +94,9 @@ type Config struct {
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`

// Query Parquet files if available
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
}

var (
Expand All @@ -104,6 +106,7 @@ var (
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand Down Expand Up @@ -142,6 +145,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
}

// Validate the config
Expand Down Expand Up @@ -171,6 +175,12 @@ func (cfg *Config) Validate() error {
return errInvalidIngesterQueryMaxAttempts
}

if cfg.EnableParquetQueryable {
if !slices.Contains(validBlockStoreTypes, blockStoreType(cfg.ParquetQueryableDefaultBlockStore)) {
return errInvalidParquetQueryableDefaultBlockStore
}
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,13 @@ func TestConfig_Validate(t *testing.T) {
},
expected: errShuffleShardingLookbackLessThanQueryStoreAfter,
},
"should fail if invalid parquet queryable default block store": {
setup: func(cfg *Config) {
cfg.EnableParquetQueryable = true
cfg.ParquetQueryableDefaultBlockStore = "none"
},
expected: errInvalidParquetQueryableDefaultBlockStore,
},
}

for testName, testData := range tests {
Expand Down
Loading