diff --git a/CHANGELOG.md b/CHANGELOG.md index 77e9869a0d4..56a5f900ec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ * [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870 * [ENHANCEMENT] Ring: Add zone label to ring_members metric. #6900 * [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901 +* [ENHANCEMENT] Parquet Storage: Allow Parquet Queryable to disable fallback to Store Gateway. #6920 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 520438c5414..9d24f58219a 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "strings" "time" "github.com/go-kit/log" @@ -50,7 +51,9 @@ const ( parquetBlockStore blockStoreType = "parquet" ) -var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} +var ( + validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} +) // AddBlockStoreTypeToContext checks HTTP header and set block store key to context if // relevant header is set. @@ -91,6 +94,7 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery type parquetQueryableWithFallback struct { services.Service + fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable blockStorageQueryable *BlocksStoreQueryable @@ -255,6 +259,7 @@ func NewParquetQueryable( limits: limits, logger: logger, defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore), + fallbackDisabled: config.ParquetQueryableFallbackDisabled, } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -307,6 +312,7 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie limits: p.limits, logger: p.logger, defaultBlockStoreType: p.defaultBlockStoreType, + fallbackDisabled: p.fallbackDisabled, }, nil } @@ -329,6 +335,8 @@ type parquetQuerierWithFallback struct { logger log.Logger defaultBlockStoreType blockStoreType + + fallbackDisabled bool } func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { @@ -351,6 +359,10 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin rAnnotations annotations.Annotations ) + if len(remaining) > 0 && q.fallbackDisabled { + return nil, nil, parquetConsistencyCheckError(remaining) + } + if len(parquet) > 0 { res, ann, qErr := q.parquetQuerier.LabelValues(InjectBlocksIntoContext(ctx, parquet...), name, hints, matchers...) if qErr != nil { @@ -401,6 +413,10 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor rAnnotations annotations.Annotations ) + if len(remaining) > 0 && q.fallbackDisabled { + return nil, nil, parquetConsistencyCheckError(remaining) + } + if len(parquet) > 0 { res, ann, qErr := q.parquetQuerier.LabelNames(InjectBlocksIntoContext(ctx, parquet...), hints, matchers...) if qErr != nil { @@ -466,6 +482,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool return storage.ErrSeriesSet(err) } + if len(remaining) > 0 && q.fallbackDisabled { + err = parquetConsistencyCheckError(remaining) + return storage.ErrSeriesSet(err) + } + // Lets sort the series to merge if len(parquet) > 0 && len(remaining) > 0 { sortSeries = true @@ -691,3 +712,15 @@ func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, return nil, false } + +func parquetConsistencyCheckError(blocks []*bucketindex.Block) error { + return fmt.Errorf("consistency check failed because some blocks were not available as parquet files: %s", strings.Join(convertBlockULIDToString(blocks), " ")) +} + +func convertBlockULIDToString(blocks []*bucketindex.Block) []string { + res := make([]string, len(blocks)) + for idx, b := range blocks { + res[idx] = b.ID.String() + } + return res +} diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 73f7c50af21..01a4bcd559c 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -716,3 +716,157 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) { }) } } + +func TestParquetQueryableFallbackDisabled(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + minT := int64(10) + maxT := util.TimeToMillis(time.Now()) + + createStore := func() *blocksStoreSetMock { + return &blocksStoreSetMock{mockedResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{{Name: labels.MetricName, Value: "fromSg"}}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil), + mockHintsResponse(block1, block2), + }, + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockValuesHints(block1, block2), + }, + }: {block1, block2}}, + }, + } + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "fromSg"), + } + ctx := user.InjectOrgID(context.Background(), "user-1") + + t.Run("should return consistency check errors when fallback disabled and some blocks not available as parquet", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := createStore() + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + queryStoreAfter: time.Hour, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + fallbackDisabled: true, // Disable fallback + } + + // Set up blocks where block1 has parquet metadata but block2 doesn't + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block2}, // Not available as parquet + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + expectedError := fmt.Sprintf("consistency check failed because some blocks were not available as parquet files: %s", block2.String()) + + t.Run("select should return consistency check error", func(t *testing.T) { + ss := pq.Select(ctx, true, nil, matchers...) + require.Error(t, ss.Err()) + require.Contains(t, ss.Err().Error(), expectedError) + }) + + t.Run("labelNames should return consistency check error", func(t *testing.T) { + _, _, err := pq.LabelNames(ctx, nil, matchers...) + require.Error(t, err) + require.Contains(t, err.Error(), expectedError) + }) + + t.Run("labelValues should return consistency check error", func(t *testing.T) { + _, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...) + require.Error(t, err) + require.Contains(t, err.Error(), expectedError) + }) + }) + + t.Run("should work normally when all blocks are available as parquet and fallback disabled", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := createStore() + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + queryStoreAfter: time.Hour, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + fallbackDisabled: true, // Disable fallback + } + + // Set up blocks where both blocks have parquet metadata + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + t.Run("select should work without error", func(t *testing.T) { + mParquetQuerier.Reset() + ss := pq.Select(ctx, true, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + }) + + t.Run("labelNames should work without error", func(t *testing.T) { + mParquetQuerier.Reset() + _, _, err := pq.LabelNames(ctx, nil, matchers...) + require.NoError(t, err) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + }) + + t.Run("labelValues should work without error", func(t *testing.T) { + mParquetQuerier.Reset() + _, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...) + require.NoError(t, err) + require.Len(t, mParquetQuerier.queriedBlocks, 2) + }) + }) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ffe6c2e0b50..55ff878d6c5 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -95,6 +95,7 @@ type Config struct { EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"` ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"` ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled" doc:"hidden"` } var ( @@ -145,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") } // Validate the config