diff --git a/CHANGELOG.md b/CHANGELOG.md index b33105811df..ed9102d09f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628 * [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676 * [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746 +* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 6177adea64f..893ab67282a 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -237,6 +237,11 @@ querier: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] + # The maximum number of times we attempt fetching data from ingesters for + # retryable errors (ex. partial data returned). + # CLI flag: -querier.ingester-query-max-attempts + [ingester_query_max_attempts: | default = 1] + # When distributor's sharding strategy is shuffle-sharding and this setting is # > 0, queriers fetch in-memory series from the minimum set of required # ingesters, selecting only ingesters which may have received series since diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 292c67970c9..8d1ac8f1fae 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4174,6 +4174,11 @@ store_gateway_client: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] +# The maximum number of times we attempt fetching data from ingesters for +# retryable errors (ex. partial data returned). +# CLI flag: -querier.ingester-query-max-attempts +[ingester_query_max_attempts: | default = 1] + # When distributor's sharding strategy is shuffle-sharding and this setting is > # 0, queriers fetch in-memory series from the minimum set of required ingesters, # selecting only ingesters which may have received series since 'now - lookback diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 62ed4489c7e..ce03329cb7e 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -20,10 +20,14 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/backoff" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) +const retryMinBackoff = time.Millisecond +const retryMaxBackoff = 5 * time.Millisecond + // Distributor is the read interface to the distributor, made an interface here // to reduce package coupling. type Distributor interface { @@ -38,36 +42,39 @@ type Distributor interface { MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int) QueryableWithFilter { return distributorQueryable{ - distributor: distributor, - streamingMetdata: streamingMetdata, - labelNamesWithMatchers: labelNamesWithMatchers, - iteratorFn: iteratorFn, - queryIngestersWithin: queryIngestersWithin, - isPartialDataEnabled: isPartialDataEnabled, + distributor: distributor, + streamingMetdata: streamingMetdata, + labelNamesWithMatchers: labelNamesWithMatchers, + iteratorFn: iteratorFn, + queryIngestersWithin: queryIngestersWithin, + isPartialDataEnabled: isPartialDataEnabled, + ingesterQueryMaxAttempts: ingesterQueryMaxAttempts, } } type distributorQueryable struct { - distributor Distributor - streamingMetdata bool - labelNamesWithMatchers bool - iteratorFn chunkIteratorFunc - queryIngestersWithin time.Duration - isPartialDataEnabled partialdata.IsCfgEnabledFunc + distributor Distributor + streamingMetdata bool + labelNamesWithMatchers bool + iteratorFn chunkIteratorFunc + queryIngestersWithin time.Duration + isPartialDataEnabled partialdata.IsCfgEnabledFunc + ingesterQueryMaxAttempts int } func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) { return &distributorQuerier{ - distributor: d.distributor, - mint: mint, - maxt: maxt, - streamingMetadata: d.streamingMetdata, - labelNamesMatchers: d.labelNamesWithMatchers, - chunkIterFn: d.iteratorFn, - queryIngestersWithin: d.queryIngestersWithin, - isPartialDataEnabled: d.isPartialDataEnabled, + distributor: d.distributor, + mint: mint, + maxt: maxt, + streamingMetadata: d.streamingMetdata, + labelNamesMatchers: d.labelNamesWithMatchers, + chunkIterFn: d.iteratorFn, + queryIngestersWithin: d.queryIngestersWithin, + isPartialDataEnabled: d.isPartialDataEnabled, + ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts, }, nil } @@ -77,13 +84,14 @@ func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bo } type distributorQuerier struct { - distributor Distributor - mint, maxt int64 - streamingMetadata bool - labelNamesMatchers bool - chunkIterFn chunkIteratorFunc - queryIngestersWithin time.Duration - isPartialDataEnabled partialdata.IsCfgEnabledFunc + distributor Distributor + mint, maxt int64 + streamingMetadata bool + labelNamesMatchers bool + chunkIterFn chunkIteratorFunc + queryIngestersWithin time.Duration + isPartialDataEnabled partialdata.IsCfgEnabledFunc + ingesterQueryMaxAttempts int } // Select implements storage.Querier interface. @@ -150,7 +158,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st } func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { - results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) + results, err := q.queryWithRetry(ctx, func() (*client.QueryStreamResponse, error) { + return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) + }) if err != nil && !partialdata.IsPartialDataError(err) { return storage.ErrSeriesSet(err) @@ -192,6 +202,33 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa return seriesSet } +func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func() (*client.QueryStreamResponse, error)) (*client.QueryStreamResponse, error) { + if q.ingesterQueryMaxAttempts <= 1 { + return queryFunc() + } + + var result *client.QueryStreamResponse + var err error + + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: retryMinBackoff, + MaxBackoff: retryMaxBackoff, + MaxRetries: q.ingesterQueryMaxAttempts, + }) + + for retries.Ongoing() { + result, err = queryFunc() + + if err == nil || !q.isRetryableError(err) { + return result, err + } + + retries.Wait() + } + + return result, err +} + func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { var ( lvs []string @@ -201,9 +238,13 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints partialDataEnabled := q.partialDataEnabled(ctx) if q.streamingMetadata { - lvs, err = q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { + return q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + }) } else { - lvs, err = q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { + return q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + }) } if partialdata.IsPartialDataError(err) { @@ -230,9 +271,13 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe ) if q.streamingMetadata { - ln, err = q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { + return q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + }) } else { - ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { + return q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + }) } if partialdata.IsPartialDataError(err) { @@ -243,6 +288,33 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe return ln, nil, err } +func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) { + if q.ingesterQueryMaxAttempts == 1 { + return labelsFunc() + } + + var result []string + var err error + + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: retryMinBackoff, + MaxBackoff: retryMaxBackoff, + MaxRetries: q.ingesterQueryMaxAttempts, + }) + + for retries.Ongoing() { + result, err = labelsFunc() + + if err == nil || !q.isRetryableError(err) { + return result, err + } + + retries.Wait() + } + + return result, err +} + // labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { log, ctx := spanlogger.New(ctx, "distributorQuerier.labelNamesWithMatchers") @@ -297,6 +369,10 @@ func (q *distributorQuerier) partialDataEnabled(ctx context.Context) bool { return q.isPartialDataEnabled != nil && q.isPartialDataEnabled(userID) } +func (q *distributorQuerier) isRetryableError(err error) bool { + return partialdata.IsPartialDataError(err) +} + type distributorExemplarQueryable struct { distributor Distributor } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 2e3f834ea44..f805413f70b 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -9,12 +9,14 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" @@ -90,7 +92,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) ctx := user.InjectOrgID(context.Background(), "test") - queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil) + queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil, 1) querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -129,7 +131,7 @@ func TestDistributorQueryableFilter(t *testing.T) { t.Parallel() d := &MockDistributor{} - dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil) + dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil, 1) now := time.Now() @@ -181,7 +183,7 @@ func TestIngesterStreaming(t *testing.T) { queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { return partialDataEnabled - }) + }, 1) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -210,6 +212,181 @@ func TestIngesterStreaming(t *testing.T) { } } +func TestDistributorQuerier_Retry(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "0") + + tests := map[string]struct { + api string + errors []error + isPartialData bool + isError bool + }{ + "Select - should retry": { + api: "Select", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "Select - should return partial data after all retries": { + api: "Select", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "Select - should not retry on other error": { + api: "Select", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + "LabelNames - should retry": { + api: "LabelNames", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "LabelNames - should return partial data after all retries": { + api: "LabelNames", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "LabelNames - should not retry on other error": { + api: "LabelNames", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + "LabelValues - should retry": { + api: "LabelValues", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "LabelValues - should return partial data after all retries": { + api: "LabelValues", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "LabelValues - should not retry on other error": { + api: "LabelValues", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + d := &MockDistributor{} + + if tc.api == "Select" { + promChunk := util.GenerateChunk(t, time.Second, model.TimeFromUnix(time.Now().Unix()), 10, promchunk.PrometheusXorChunk) + clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{promChunk}) + require.NoError(t, err) + + for _, err := range tc.errors { + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Chunks: clientChunks, + }, + }, + }, err).Once() + } + } else if tc.api == "LabelNames" { + res := []string{"foo"} + for _, err := range tc.errors { + d.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + d.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + } + } else if tc.api == "LabelValues" { + res := []string{"foo"} + for _, err := range tc.errors { + d.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + d.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + } + } + + ingesterQueryMaxAttempts := 3 + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + return true + }, ingesterQueryMaxAttempts) + querier, err := queryable.Querier(mint, maxt) + require.NoError(t, err) + + if tc.api == "Select" { + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}) + if tc.isError { + require.Error(t, seriesSet.Err()) + return + } + require.NoError(t, seriesSet.Err()) + + if tc.isPartialData { + require.Contains(t, seriesSet.Warnings(), partialdata.ErrPartialData.Error()) + } + } else { + var annots annotations.Annotations + var err error + if tc.api == "LabelNames" { + _, annots, err = querier.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + } else if tc.api == "LabelValues" { + _, annots, err = querier.LabelValues(ctx, "foo", nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + } + + if tc.isError { + require.Error(t, err) + return + } + require.NoError(t, err) + + if tc.isPartialData { + warnings, _ := annots.AsStrings("", 1, 0) + require.Contains(t, warnings, partialdata.ErrPartialData.Error()) + } + } + }) + } +} + func TestDistributorQuerier_LabelNames(t *testing.T) { t.Parallel() @@ -249,7 +426,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, 0, func(string) bool { return partialDataEnabled - }) + }, 1) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f13121caf98..3bf8be517cf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -78,6 +78,9 @@ type Config struct { // The maximum number of times we attempt fetching missing blocks from different Store Gateways. StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"` + // The maximum number of times we attempt fetching data from Ingesters. + IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"` + ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` // Experimental. Use https://github.com/thanos-io/promql-engine rather than @@ -95,6 +98,7 @@ var ( errEmptyTimeRange = errors.New("empty time range") errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)") errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1") + errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -124,6 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier") + f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") @@ -155,6 +160,10 @@ func (cfg *Config) Validate() error { return errInvalidConsistencyCheckAttempts } + if cfg.IngesterQueryMaxAttempts < 1 { + return errInvalidIngesterQueryMaxAttempts + } + return nil } @@ -174,7 +183,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e1e7169eabd..055ae2fab45 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -300,7 +300,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { } distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) tCases := []struct { name string @@ -446,7 +446,7 @@ func TestLimits(t *testing.T) { response: &streamResponse, } - distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil) + distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) tCases := []struct { name string