From 2d6e6793639085ef506c29d81bd6f123fb3a5bfc Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 26 Apr 2021 19:39:18 +0200 Subject: [PATCH 1/4] Deprecated -store.query-chunk-limit in favour of the new config -querier.max-fetched-chunks-per-query which is applied both to ingesters and long-term storage Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + .../config/runtime.yaml | 3 +- .../config/runtime.yaml | 3 +- .../config/runtime.yaml | 3 +- docs/configuration/config-file-reference.md | 21 +++-- pkg/chunk/chunk_store.go | 2 +- pkg/chunk/composite_store.go | 2 +- pkg/chunk/series_store.go | 2 +- pkg/chunk/storage/factory.go | 2 +- pkg/distributor/distributor_test.go | 90 +++++++++++++++---- pkg/distributor/query.go | 32 ++++++- pkg/ingester/client/compat.go | 8 +- pkg/ingester/client/custom.go | 14 +++ pkg/querier/blocks_store_queryable.go | 24 +---- pkg/querier/blocks_store_queryable_test.go | 2 +- pkg/storegateway/bucket_stores.go | 2 +- pkg/storegateway/gateway_test.go | 2 +- pkg/util/labels.go | 19 ++++ pkg/util/labels_test.go | 35 ++++++++ pkg/util/validation/limits.go | 36 +++++--- pkg/util/validation/limits_test.go | 39 ++++++++ 21 files changed, 274 insertions(+), 68 deletions(-) create mode 100644 pkg/ingester/client/custom.go create mode 100644 pkg/util/labels_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index afb0c6c5201..208f0ae5c31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [CHANGE] Querier: removed the config option `-store.max-look-back-period`, which was deprecated in Cortex 1.6 and was used only by the chunks storage. You should use `-querier.max-query-lookback` instead. #4101 * [CHANGE] Query Frontend: removed the config option `-querier.compress-http-responses`, which was deprecated in Cortex 1.6. You should use`-api.response-compression-enabled` instead. #4101 * [CHANGE] Runtime-config / overrides: removed the config options `-limits.per-user-override-config` (use `-runtime-config.file`) and `-limits.per-user-override-period` (use `-runtime-config.reload-period`), both deprecated since Cortex 0.6.0. #4112 +* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4128 * [FEATURE] The following features have been marked as stable: #4101 - Shuffle-sharding - Querier support for querying chunks and blocks store at the same time diff --git a/development/tsdb-blocks-storage-s3-gossip/config/runtime.yaml b/development/tsdb-blocks-storage-s3-gossip/config/runtime.yaml index 475c2747085..7ae1e506e67 100644 --- a/development/tsdb-blocks-storage-s3-gossip/config/runtime.yaml +++ b/development/tsdb-blocks-storage-s3-gossip/config/runtime.yaml @@ -1 +1,2 @@ -# This file is left empty. It can be configured with overrides or other runtime config. +# This file can be used to set overrides or other runtime config. +ingester_stream_chunks_when_using_blocks: true diff --git a/development/tsdb-blocks-storage-s3-single-binary/config/runtime.yaml b/development/tsdb-blocks-storage-s3-single-binary/config/runtime.yaml index 475c2747085..7ae1e506e67 100644 --- a/development/tsdb-blocks-storage-s3-single-binary/config/runtime.yaml +++ b/development/tsdb-blocks-storage-s3-single-binary/config/runtime.yaml @@ -1 +1,2 @@ -# This file is left empty. It can be configured with overrides or other runtime config. +# This file can be used to set overrides or other runtime config. +ingester_stream_chunks_when_using_blocks: true diff --git a/development/tsdb-blocks-storage-s3/config/runtime.yaml b/development/tsdb-blocks-storage-s3/config/runtime.yaml index 475c2747085..7ae1e506e67 100644 --- a/development/tsdb-blocks-storage-s3/config/runtime.yaml +++ b/development/tsdb-blocks-storage-s3/config/runtime.yaml @@ -1 +1,2 @@ -# This file is left empty. It can be configured with overrides or other runtime config. +# This file can be used to set overrides or other runtime config. +ingester_stream_chunks_when_using_blocks: true diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e74a6f23a70..c64ffdb6cb9 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4007,14 +4007,25 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -ingester.max-global-metadata-per-metric [max_global_metadata_per_metric: | default = 0] -# Maximum number of chunks that can be fetched in a single query. This limit is -# enforced when fetching chunks from the long-term storage. When running the -# Cortex chunks storage, this limit is enforced in the querier, while when -# running the Cortex blocks storage this limit is both enforced in the querier -# and store-gateway. 0 to disable. +# Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its +# respective YAML config option instead. Maximum number of chunks that can be +# fetched in a single query. This limit is enforced when fetching chunks from +# the long-term storage only. When running the Cortex chunks storage, this limit +# is enforced in the querier and ruler, while when running the Cortex blocks +# storage this limit is enforced in the querier, ruler and store-gateway. 0 to +# disable. # CLI flag: -store.query-chunk-limit [max_chunks_per_query: | default = 2000000] +# Maximum number of chunks that can be fetched in a single query from ingesters +# and long-term storage: the total number of actual fetched chunks could be 2x +# the limit, being independently applied when querying ingesters and long-term +# storage. This limit is enforced in the ingester (if chunks streaming is +# enabled), querier, ruler and store-gateway. Takes precedence over the +# deprecated -store.query-chunk-limit. 0 to disable. +# CLI flag: -querier.max-fetched-chunks-per-query +[max_fetched_chunks_per_query: | default = 0] + # Limit how long back data (series and metadata) can be queried, up until # duration ago. This limit is enforced in the query-frontend, querier # and ruler. If the requested time range is outside the allowed range, the diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 595f79641be..54796b6408b 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -355,7 +355,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th filtered := filterChunksByTime(from, through, chunks) level.Debug(log).Log("Chunks post filtering", len(chunks)) - maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) + maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID) if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery { err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery)) level.Error(log).Log("err", err) diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index d3c79013bbf..21b1ec02435 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -14,7 +14,7 @@ import ( // StoreLimits helps get Limits specific to Queries for Stores type StoreLimits interface { - MaxChunksPerQuery(userID string) int + MaxChunksPerQueryFromStore(userID string) int MaxQueryLength(userID string) time.Duration } diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 9fb5b94128b..22ceb43fa11 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -112,7 +112,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode chunks := chks[0] fetcher := fetchers[0] // Protect ourselves against OOMing. - maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) + maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID) if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery { err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery)) level.Error(log).Log("err", err) diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index d59e5ec3e14..73c96ed2ed6 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -73,7 +73,7 @@ func RegisterIndexStore(name string, indexClientFactory IndexClientFactoryFunc, // StoreLimits helps get Limits specific to Queries for Stores type StoreLimits interface { CardinalityLimit(userID string) int - MaxChunksPerQuery(userID string) int + MaxChunksPerQueryFromStore(userID string) int MaxQueryLength(userID string) time.Duration } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c2491c675b1..45b38fd3856 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -870,6 +870,60 @@ func TestDistributor_PushQuery(t *testing.T) { } } +func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReached(t *testing.T) { + const maxChunksLimit = 30 // Chunks are duplicated due to replication factor. + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.MaxChunksPerQuery = maxChunksLimit + + // Prepare distributors. + ds, _, r, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + }) + defer stopAll(ds, r) + + // Push a number of series below the max chunks limit. Each series has 1 sample, + // so expect 1 chunk per series when querying back. + initialSeries := maxChunksLimit / 3 + writeReq := makeWriteRequest(0, initialSeries, 0) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + + // Since the number of series (and thus chunks) is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, initialSeries) + + // Push more series to exceed the limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + for i := 0; i < maxChunksLimit; i++ { + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0), + ) + } + + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Since the number of series (and thus chunks) is exceeding to the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Contains(t, err.Error(), "the query hit the max number of chunks limit") +} + func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "user") @@ -1754,22 +1808,12 @@ func stopAll(ds []*Distributor, r *ring.Ring) { func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortexpb.WriteRequest { request := &cortexpb.WriteRequest{} for i := 0; i < samples; i++ { - ts := cortexpb.PreallocTimeseries{ - TimeSeries: &cortexpb.TimeSeries{ - Labels: []cortexpb.LabelAdapter{ - {Name: model.MetricNameLabel, Value: "foo"}, - {Name: "bar", Value: "baz"}, - {Name: "sample", Value: fmt.Sprintf("%d", i)}, - }, - }, - } - ts.Samples = []cortexpb.Sample{ - { - Value: float64(i), - TimestampMs: startTimestampMs + int64(i), - }, - } - request.Timeseries = append(request.Timeseries, ts) + request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries( + []cortexpb.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "bar", Value: "baz"}, + {Name: "sample", Value: fmt.Sprintf("%d", i)}, + }, startTimestampMs+int64(i), float64(i))) } for i := 0; i < metadata; i++ { @@ -1784,6 +1828,20 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortex return request } +func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts int64, value float64) cortexpb.PreallocTimeseries { + return cortexpb.PreallocTimeseries{ + TimeSeries: &cortexpb.TimeSeries{ + Labels: labels, + Samples: []cortexpb.Sample{ + { + Value: value, + TimestampMs: ts, + }, + }, + }, + } +} + func makeWriteRequestHA(samples int, replica, cluster string) *cortexpb.WriteRequest { request := &cortexpb.WriteRequest{} for i := 0; i < samples; i++ { diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index cda633dbcaf..2ad91d45d22 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -2,6 +2,7 @@ package distributor import ( "context" + "fmt" "io" "time" @@ -9,6 +10,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/instrument" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" @@ -17,6 +19,11 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" grpc_util "github.com/cortexproject/cortex/pkg/util/grpc" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +var ( + errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)" ) // Query multiple ingesters and returns a Matrix of samples. @@ -50,6 +57,11 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) { var result *ingester_client.QueryStreamResponse err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { + userID, err := tenant.TenantID(ctx) + if err != nil { + return err + } + req, err := ingester_client.ToQueryRequest(from, to, matchers) if err != nil { return err @@ -60,7 +72,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc return err } - result, err = d.queryIngesterStream(ctx, replicationSet, req) + result, err = d.queryIngesterStream(ctx, userID, replicationSet, req) if err != nil { return err } @@ -173,7 +185,12 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re } // queryIngesterStream queries the ingesters using the new streaming API. -func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { +func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { + var ( + maxChunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) + totChunksCount = atomic.Int32{} + ) + // Fetch samples from multiple ingesters results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) @@ -203,6 +220,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, err } + // Enforce the max chunks limits. + if maxChunksLimit > 0 { + if totChunks := int(totChunksCount.Add(int32(resp.ChunksCount()))); totChunks > maxChunksLimit { + // We expect to be always able to convert the label matchers back to Prometheus ones. + // In case we fail (unexpected) the error will not include the matchers, but the core + // logic doesn't break. + matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) + return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + } + } + result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) } diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 751566a8748..78095ff32ec 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -25,7 +25,7 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ // FromQueryRequest unpacks a QueryRequest proto. func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { - matchers, err := fromLabelMatchers(req.Matchers) + matchers, err := FromLabelMatchers(req.Matchers) if err != nil { return 0, 0, nil, err } @@ -90,7 +90,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) { matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet)) for _, matchers := range req.MatchersSet { - matchers, err := fromLabelMatchers(matchers.Matchers) + matchers, err := FromLabelMatchers(matchers.Matchers) if err != nil { return 0, 0, nil, err } @@ -131,7 +131,7 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*l var matchers []*labels.Matcher if req.Matchers != nil { - matchers, err = fromLabelMatchers(req.Matchers.Matchers) + matchers, err = FromLabelMatchers(req.Matchers.Matchers) if err != nil { return "", 0, 0, nil, err } @@ -165,7 +165,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) { return result, nil } -func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { +func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { result := make([]*labels.Matcher, 0, len(matchers)) for _, matcher := range matchers { var mtype labels.MatchType diff --git a/pkg/ingester/client/custom.go b/pkg/ingester/client/custom.go new file mode 100644 index 00000000000..becaab3c022 --- /dev/null +++ b/pkg/ingester/client/custom.go @@ -0,0 +1,14 @@ +package client + +// ChunksCount returns the number of chunks in response. +func (m *QueryStreamResponse) ChunksCount() int { + if len(m.Chunkseries) == 0 { + return 0 + } + + count := 0 + for _, entry := range m.Chunkseries { + count += len(entry.Chunks) + } + return count +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 42121e951db..072647a4864 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -53,7 +53,7 @@ const ( var ( errNoStoreGatewayAddress = errors.New("no store-gateway address configured") - errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks for %s (limit: %d)" + errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -89,7 +89,7 @@ type BlocksStoreClient interface { type BlocksStoreLimits interface { bucket.TenantConfigProvider - MaxChunksPerQuery(userID string) int + MaxChunksPerQueryFromStore(userID string) int StoreGatewayTenantShardSize(userID string) int } @@ -401,7 +401,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = []storage.SeriesSet(nil) resWarnings = storage.Warnings(nil) - maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID) + maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) leftChunksLimit = maxChunksLimit resultMtx sync.Mutex @@ -615,7 +615,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if maxChunksLimit > 0 { actual := numChunks.Add(int32(len(s.Chunks))) if actual > int32(leftChunksLimit) { - return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, convertMatchersToString(matchers), maxChunksLimit)) + return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) } } } @@ -937,19 +937,3 @@ func countSeriesBytes(series []*storepb.Series) (count uint64) { return count } - -func convertMatchersToString(matchers []*labels.Matcher) string { - out := strings.Builder{} - out.WriteRune('{') - - for idx, m := range matchers { - if idx > 0 { - out.WriteRune(',') - } - - out.WriteString(m.String()) - } - - out.WriteRune('}') - return out.String() -} diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index e600df78d87..a777d5041c7 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1300,7 +1300,7 @@ type blocksStoreLimitsMock struct { storeGatewayTenantShardSize int } -func (m *blocksStoreLimitsMock) MaxChunksPerQuery(_ string) int { +func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int { return m.maxChunksPerQuery } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 88a05e73ae0..3c3da94b06a 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -623,7 +623,7 @@ func newChunksLimiterFactory(limits *validation.Overrides, userID string) store. // Since limit overrides could be live reloaded, we have to get the current user's limit // each time a new limiter is instantiated. return &chunkLimiter{ - limiter: store.NewLimiter(uint64(limits.MaxChunksPerQuery(userID)), failedCounter), + limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter), } } } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 7dee234faae..a3ba3aa8145 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -788,7 +788,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi t.Run(testName, func(t *testing.T) { // Customise the limits. limits := defaultLimitsConfig() - limits.MaxChunksPerQuery = testData.limit + limits.MaxChunksPerQueryFromStore = testData.limit overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) diff --git a/pkg/util/labels.go b/pkg/util/labels.go index 2177268af0b..e77b9a6dbeb 100644 --- a/pkg/util/labels.go +++ b/pkg/util/labels.go @@ -1,6 +1,8 @@ package util import ( + "strings" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -14,3 +16,20 @@ func LabelsToMetric(ls labels.Labels) model.Metric { } return m } + +// LabelMatchersToString returns a string representing the input label matchers. +func LabelMatchersToString(matchers []*labels.Matcher) string { + out := strings.Builder{} + out.WriteRune('{') + + for idx, m := range matchers { + if idx > 0 { + out.WriteRune(',') + } + + out.WriteString(m.String()) + } + + out.WriteRune('}') + return out.String() +} diff --git a/pkg/util/labels_test.go b/pkg/util/labels_test.go new file mode 100644 index 00000000000..bbf999200ae --- /dev/null +++ b/pkg/util/labels_test.go @@ -0,0 +1,35 @@ +package util + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" +) + +func TestLabelMatchersToString(t *testing.T) { + tests := []struct { + input []*labels.Matcher + expected string + }{ + { + input: nil, + expected: "{}", + }, { + input: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + expected: `{foo="bar"}`, + }, { + input: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchNotEqual, "who", "boh"), + }, + expected: `{foo="bar",who!="boh"}`, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, LabelMatchersToString(tc.input)) + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 8f74e390850..748c102ef8e 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -69,13 +69,14 @@ type Limits struct { MaxGlobalMetadataPerMetric int `yaml:"max_global_metadata_per_metric" json:"max_global_metadata_per_metric"` // Querier enforced limits. - MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` - MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` - MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` - MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` - CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` - MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness"` - MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + MaxChunksPerQueryFromStore int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` // TODO Remove in Cortex 1.12. + MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` + MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` + MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` + MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` + CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` + MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness"` + MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` @@ -131,8 +132,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLocalMetadataPerMetric, "ingester.max-metadata-per-metric", 10, "The maximum number of metadata per metric, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalMetricsWithMetadataPerUser, "ingester.max-global-metadata-per-user", 0, "The maximum number of active metrics with metadata per user, across the cluster. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") - - f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage. When running the Cortex chunks storage, this limit is enforced in the querier, while when running the Cortex blocks storage this limit is both enforced in the querier and store-gateway. 0 to disable.") + f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") + f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -334,8 +335,21 @@ func (o *Overrides) MaxGlobalSeriesPerMetric(userID string) int { return o.getOverridesForUser(userID).MaxGlobalSeriesPerMetric } -// MaxChunksPerQuery returns the maximum number of chunks allowed per query. -func (o *Overrides) MaxChunksPerQuery(userID string) int { +// MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching +// chunks from the long-term storage. +func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { + // If the new config option is set, then it should take precedence. + if value := o.getOverridesForUser(userID).MaxChunksPerQuery; value > 0 { + return value + } + + // Fallback to the deprecated config option. + return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore +} + +// MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching +// chunks from ingesters. +func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQuery } diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 7306341eb57..f9551c88d63 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) // mockTenantLimits exposes per-tenant limits based on a provided map @@ -68,6 +70,43 @@ func TestLimits_Validate(t *testing.T) { } } +func TestOverrides_MaxChunksPerQueryFromStore(t *testing.T) { + tests := map[string]struct { + setup func(limits *Limits) + expected int + }{ + "should return the default legacy setting with the default config": { + setup: func(limits *Limits) {}, + expected: 2000000, + }, + "the new config option should take precedence over the deprecated one": { + setup: func(limits *Limits) { + limits.MaxChunksPerQueryFromStore = 10 + limits.MaxChunksPerQuery = 20 + }, + expected: 20, + }, + "the deprecated config option should be used if the new config option is unset": { + setup: func(limits *Limits) { + limits.MaxChunksPerQueryFromStore = 10 + }, + expected: 10, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits := Limits{} + flagext.DefaultValues(&limits) + testData.setup(&limits) + + overrides, err := NewOverrides(limits, nil) + require.NoError(t, err) + assert.Equal(t, testData.expected, overrides.MaxChunksPerQueryFromStore("test")) + }) + } +} + func TestOverridesManager_GetOverrides(t *testing.T) { tenantLimits := map[string]*Limits{} From 0d69f5c4ac1f47a93c91bcd4f9f471a8d5dcf10d Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 27 Apr 2021 11:02:25 +0200 Subject: [PATCH 2/4] Fixed PR number in CHANGELOG Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 208f0ae5c31..5d856ed4f4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ * [CHANGE] Querier: removed the config option `-store.max-look-back-period`, which was deprecated in Cortex 1.6 and was used only by the chunks storage. You should use `-querier.max-query-lookback` instead. #4101 * [CHANGE] Query Frontend: removed the config option `-querier.compress-http-responses`, which was deprecated in Cortex 1.6. You should use`-api.response-compression-enabled` instead. #4101 * [CHANGE] Runtime-config / overrides: removed the config options `-limits.per-user-override-config` (use `-runtime-config.file`) and `-limits.per-user-override-period` (use `-runtime-config.reload-period`), both deprecated since Cortex 0.6.0. #4112 -* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4128 +* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125 * [FEATURE] The following features have been marked as stable: #4101 - Shuffle-sharding - Querier support for querying chunks and blocks store at the same time From 6307206967f6814b30d291f397e058b99200e089 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 28 Apr 2021 17:19:23 +0200 Subject: [PATCH 3/4] Moved CHANGELOG entry to unreleased Signed-off-by: Marco Pracucci --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d856ed4f4d..d86af798c59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master / unreleased +* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125 + ## 1.9.0 in progress * [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129 @@ -18,7 +20,6 @@ * [CHANGE] Querier: removed the config option `-store.max-look-back-period`, which was deprecated in Cortex 1.6 and was used only by the chunks storage. You should use `-querier.max-query-lookback` instead. #4101 * [CHANGE] Query Frontend: removed the config option `-querier.compress-http-responses`, which was deprecated in Cortex 1.6. You should use`-api.response-compression-enabled` instead. #4101 * [CHANGE] Runtime-config / overrides: removed the config options `-limits.per-user-override-config` (use `-runtime-config.file`) and `-limits.per-user-override-period` (use `-runtime-config.reload-period`), both deprecated since Cortex 0.6.0. #4112 -* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125 * [FEATURE] The following features have been marked as stable: #4101 - Shuffle-sharding - Querier support for querying chunks and blocks store at the same time From a7e3cc8aa6ae42e2ca4eca816966935f5cb3b3b9 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 28 Apr 2021 17:24:10 +0200 Subject: [PATCH 4/4] Addressed review feedback Signed-off-by: Marco Pracucci --- pkg/distributor/query.go | 10 +++++----- pkg/util/labels_test.go | 6 ++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 2ad91d45d22..0b20d023128 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -187,8 +187,8 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re // queryIngesterStream queries the ingesters using the new streaming API. func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( - maxChunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) - totChunksCount = atomic.Int32{} + chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) + chunksCount = atomic.Int32{} ) // Fetch samples from multiple ingesters @@ -221,13 +221,13 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re } // Enforce the max chunks limits. - if maxChunksLimit > 0 { - if totChunks := int(totChunksCount.Add(int32(resp.ChunksCount()))); totChunks > maxChunksLimit { + if chunksLimit > 0 { + if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit { // We expect to be always able to convert the label matchers back to Prometheus ones. // In case we fail (unexpected) the error will not include the matchers, but the core // logic doesn't break. matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) - return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit)) } } diff --git a/pkg/util/labels_test.go b/pkg/util/labels_test.go index bbf999200ae..70f7eb4faf7 100644 --- a/pkg/util/labels_test.go +++ b/pkg/util/labels_test.go @@ -26,6 +26,12 @@ func TestLabelMatchersToString(t *testing.T) { labels.MustNewMatcher(labels.MatchNotEqual, "who", "boh"), }, expected: `{foo="bar",who!="boh"}`, + }, { + input: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric"), + labels.MustNewMatcher(labels.MatchNotEqual, "who", "boh"), + }, + expected: `{__name__="metric",who!="boh"}`, }, }