diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index c07c21cdd05..e3488f10387 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -18,6 +18,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/engine" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/api" ) @@ -98,6 +99,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { } ctx = engine.AddEngineTypeToContext(ctx, r) + ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step)) if err != nil { return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") @@ -153,6 +155,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { } ctx = engine.AddEngineTypeToContext(ctx, r) + ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts)) if err != nil { return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 1b9093ab047..94aa71e13e9 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -33,6 +33,39 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +type blockStorageType struct{} + +var blockStorageKey = blockStorageType{} + +const BlockStoreTypeHeader = "X-Cortex-BlockStore-Type" + +type blockStoreType string + +const ( + tsdbBlockStore blockStoreType = "tsdb" + parquetBlockStore blockStoreType = "parquet" +) + +var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} + +// AddBlockStoreTypeToContext checks HTTP header and set block store key to context if +// relevant header is set. +func AddBlockStoreTypeToContext(ctx context.Context, storeType string) context.Context { + ng := blockStoreType(storeType) + switch ng { + case tsdbBlockStore, parquetBlockStore: + return context.WithValue(ctx, blockStorageKey, ng) + } + return ctx +} + +func getBlockStoreType(ctx context.Context, defaultBlockStoreType blockStoreType) blockStoreType { + if ng, ok := ctx.Value(blockStorageKey).(blockStoreType); ok { + return ng + } + return defaultBlockStoreType +} + type parquetQueryableFallbackMetrics struct { blocksQueriedTotal *prometheus.CounterVec selectCount *prometheus.CounterVec @@ -69,6 +102,8 @@ type parquetQueryableWithFallback struct { limits *validation.Overrides logger log.Logger + + defaultBlockStoreType blockStoreType } func NewParquetQueryable( @@ -153,6 +188,7 @@ func NewParquetQueryable( metrics: newParquetQueryableFallbackMetrics(reg), limits: limits, logger: logger, + defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -195,15 +231,16 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie } return &parquetQuerierWithFallback{ - minT: mint, - maxT: maxt, - parquetQuerier: pq, - queryStoreAfter: p.queryStoreAfter, - blocksStoreQuerier: bsq, - finder: p.finder, - metrics: p.metrics, - limits: p.limits, - logger: p.logger, + minT: mint, + maxT: maxt, + parquetQuerier: pq, + queryStoreAfter: p.queryStoreAfter, + blocksStoreQuerier: bsq, + finder: p.finder, + metrics: p.metrics, + limits: p.limits, + logger: p.logger, + defaultBlockStoreType: p.defaultBlockStoreType, }, nil } @@ -224,6 +261,8 @@ type parquetQuerierWithFallback struct { limits *validation.Overrides logger log.Logger + + defaultBlockStoreType blockStoreType } func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { @@ -394,10 +433,11 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i return nil, nil, err } + useParquet := getBlockStoreType(ctx, q.defaultBlockStoreType) == parquetBlockStore parquetBlocks := make([]*bucketindex.Block, 0, len(blocks)) remaining := make([]*bucketindex.Block, 0, len(blocks)) for _, b := range blocks { - if b.Parquet != nil { + if useParquet && b.Parquet != nil { parquetBlocks = append(parquetBlocks, b) continue } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index cd6aa02f874..edb503a5d5c 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -76,14 +76,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { mParquetQuerier := &mockParquetQuerier{} pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 4), - logger: log.NewNopLogger(), + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 4), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -118,14 +119,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { mParquetQuerier := &mockParquetQuerier{} pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), - logger: log.NewNopLogger(), + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -178,14 +180,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { mParquetQuerier := &mockParquetQuerier{} pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), - logger: log.NewNopLogger(), + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -244,14 +247,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { mParquetQuerier := &mockParquetQuerier{} pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), - logger: log.NewNopLogger(), + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -291,6 +295,106 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { }) }) + t.Run("Default query TSDB block store even if parquet blocks available. Override with ctx", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := createStore() + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: tsdbBlockStore, + } + + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + t.Run("select", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + ss := pq.Select(ctx, true, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, stores.queriedBlocks, 2) + require.Len(t, mParquetQuerier.queriedBlocks, 0) + }) + + t.Run("select with ctx key override to parquet", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore)) + ss := pq.Select(newCtx, true, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, stores.queriedBlocks, 0) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + }) + + t.Run("labelNames", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + r, _, err := pq.LabelNames(ctx, nil, matchers...) + require.NoError(t, err) + require.Len(t, stores.queriedBlocks, 2) + require.Len(t, mParquetQuerier.queriedBlocks, 0) + require.Contains(t, r, "fromSg") + require.NotContains(t, r, "fromParquet") + }) + + t.Run("labelNames with ctx key override to parquet", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore)) + r, _, err := pq.LabelNames(newCtx, nil, matchers...) + require.NoError(t, err) + require.Len(t, stores.queriedBlocks, 0) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + require.NotContains(t, r, "fromSg") + require.Contains(t, r, "fromParquet") + }) + + t.Run("labelValues", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + r, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...) + require.NoError(t, err) + require.Len(t, stores.queriedBlocks, 2) + require.Len(t, mParquetQuerier.queriedBlocks, 0) + require.Contains(t, r, "fromSg") + require.NotContains(t, r, "fromParquet") + }) + + t.Run("labelValues with ctx key override to parquet", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore)) + r, _, err := pq.LabelValues(newCtx, labels.MetricName, nil, matchers...) + require.NoError(t, err) + require.Len(t, stores.queriedBlocks, 0) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + require.NotContains(t, r, "fromSg") + require.Contains(t, r, "fromParquet") + }) + }) } func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 124615af612..3cd74b365de 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "slices" "strings" "sync" "time" @@ -93,8 +94,9 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"` } var ( @@ -104,6 +106,7 @@ var ( errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)") errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") + errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -142,6 +145,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") } // Validate the config @@ -171,6 +175,12 @@ func (cfg *Config) Validate() error { return errInvalidIngesterQueryMaxAttempts } + if cfg.EnableParquetQueryable { + if !slices.Contains(validBlockStoreTypes, blockStoreType(cfg.ParquetQueryableDefaultBlockStore)) { + return errInvalidParquetQueryableDefaultBlockStore + } + } + return nil } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 055ae2fab45..f4a3e17df00 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1677,6 +1677,13 @@ func TestConfig_Validate(t *testing.T) { }, expected: errShuffleShardingLookbackLessThanQueryStoreAfter, }, + "should fail if invalid parquet queryable default block store": { + setup: func(cfg *Config) { + cfg.EnableParquetQueryable = true + cfg.ParquetQueryableDefaultBlockStore = "none" + }, + expected: errInvalidParquetQueryableDefaultBlockStore, + }, } for testName, testData := range tests {