From 727e9ad77c4a8ae8bed670bcdce687b77b765126 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 3 Jun 2021 11:01:47 -0700 Subject: [PATCH 1/4] max chunks per query limit shared between ingesters and storage gateways Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 - docs/configuration/config-file-reference.md | 8 +++--- pkg/distributor/distributor_test.go | 6 +++-- pkg/distributor/query.go | 28 ++++----------------- pkg/querier/blocks_store_queryable.go | 23 ++++------------- pkg/querier/blocks_store_queryable_test.go | 14 +++++------ pkg/querier/querier.go | 2 +- pkg/util/limiter/query_limiter.go | 27 ++++++++++++++++---- pkg/util/limiter/query_limiter_test.go | 8 +++--- pkg/util/validation/limits.go | 8 +----- 10 files changed, 52 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c1e701e8a6..c346292e3a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,7 +63,6 @@ * [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271 * [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 * [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218 - ## Blocksconvert * [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 801dc78f46e..d62d5bb898a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4043,11 +4043,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [max_chunks_per_query: | default = 2000000] # Maximum number of chunks that can be fetched in a single query from ingesters -# and long-term storage: the total number of actual fetched chunks could be 2x -# the limit, being independently applied when querying ingesters and long-term -# storage. This limit is enforced in the ingester (if chunks streaming is -# enabled), querier, ruler and store-gateway. Takes precedence over the -# deprecated -store.query-chunk-limit. 0 to disable. +# and long-term storage. This limit is enforced in the ingester (if chunks +# streaming is enabled), querier, ruler and store-gateway. Takes precedence over +# the deprecated -store.query-chunk-limit. 0 to disable. # CLI flag: -querier.max-fetched-chunks-per-query [max_fetched_chunks_per_query: | default = 0] diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4ba14fe6f4b..2b0fab3c4f2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -912,6 +912,8 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac shardByAllLabels: true, limits: limits, }) + + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit)) defer stopAll(ds, r) // Push a number of series below the max chunks limit. Each series has 1 sample, @@ -957,7 +959,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac ctx := user.InjectOrgID(context.Background(), "user") limits := &validation.Limits{} flagext.DefaultValues(limits) - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0)) // Prepare distributors. ds, _, r, _ := prepare(t, prepConfig{ @@ -1043,7 +1045,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs var maxBytesLimit = (seriesToAdd) * responseChunkSize // Update the limiter with the calculated limits. - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0)) // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. writeReq = makeWriteRequest(0, seriesToAdd-1, 0) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 2e405e1a8b6..abd69f1da20 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -2,7 +2,6 @@ package distributor import ( "context" - "fmt" "io" "sort" "time" @@ -11,7 +10,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/instrument" - "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" @@ -24,10 +22,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -var ( - errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)" -) - // Query multiple ingesters and returns a Matrix of samples. func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { var matrix model.Matrix @@ -86,11 +80,6 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) { var result *ingester_client.QueryStreamResponse err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { - userID, err := tenant.TenantID(ctx) - if err != nil { - return err - } - req, err := ingester_client.ToQueryRequest(from, to, matchers) if err != nil { return err @@ -101,7 +90,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc return err } - result, err = d.queryIngesterStream(ctx, userID, replicationSet, req) + result, err = d.queryIngesterStream(ctx, replicationSet, req) if err != nil { return err } @@ -290,10 +279,8 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe } // queryIngesterStream queries the ingesters using the new streaming API. -func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { +func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( - chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID) - chunksCount = atomic.Int32{} queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) @@ -327,14 +314,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re } // Enforce the max chunks limits. - if chunksLimit > 0 { - if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit { - // We expect to be always able to convert the label matchers back to Prometheus ones. - // In case we fail (unexpected) the error will not include the matchers, but the core - // logic doesn't break. - matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) - return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit)) - } + matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) + if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount(), matchers); chunkLimitErr != nil { + return nil, validation.LimitError(chunkLimitErr.Error()) } for _, series := range resp.Chunkseries { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 79ca729db31..2d1fd41db5d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -54,8 +54,7 @@ const ( ) var ( - errNoStoreGatewayAddress = errors.New("no store-gateway address configured") - errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" + errNoStoreGatewayAddress = errors.New("no store-gateway address configured") ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -403,14 +402,11 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = []storage.SeriesSet(nil) resWarnings = storage.Warnings(nil) - maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) - leftChunksLimit = maxChunksLimit - resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) + seriesSets, queriedBlocks, warnings, _, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers) if err != nil { return nil, err } @@ -420,11 +416,6 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = append(resSeriesSets, seriesSets...) resWarnings = append(resWarnings, warnings...) - // Given a single block is guaranteed to not be queried twice, we can safely decrease the number of - // chunks we can still read before hitting the limit (max == 0 means disabled). - if maxChunksLimit > 0 { - leftChunksLimit -= numChunks - } resultMtx.Unlock() return queriedBlocks, nil @@ -552,8 +543,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( maxT int64, matchers []*labels.Matcher, convertedMatchers []storepb.LabelMatcher, - maxChunksLimit int, - leftChunksLimit int, ) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) @@ -620,12 +609,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). - if maxChunksLimit > 0 { - actual := numChunks.Add(int32(len(s.Chunks))) - if actual > int32(leftChunksLimit) { - return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) - } + if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks), matchers); chunkLimitErr != nil { + return validation.LimitError(chunkLimitErr.Error()) } + chunksSize := 0 for _, c := range s.Chunks { chunksSize += c.Size() diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 35d33ae9bb2..8b348f7e94c 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -51,7 +51,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} series1Label = labels.Label{Name: "series", Value: "1"} series2Label = labels.Label{Name: "series", Value: "2"} - noOpQueryLimiter = limiter.NewQueryLimiter(0, 0) + noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) ) type valueResult struct { @@ -451,8 +451,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, - queryLimiter: noOpQueryLimiter, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + queryLimiter: limiter.NewQueryLimiter(0, 0, 1), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -489,8 +489,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, - queryLimiter: noOpQueryLimiter, - expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + queryLimiter: limiter.NewQueryLimiter(0, 0, 3), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), }, "max series per query limit hit while fetching chunks": { finderResult: bucketindex.Blocks{ @@ -507,7 +507,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, - queryLimiter: limiter.NewQueryLimiter(1, 0), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), }, "max chunk bytes per query limit hit while fetching chunks": { @@ -525,7 +525,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, - queryLimiter: limiter.NewQueryLimiter(0, 8), + queryLimiter: limiter.NewQueryLimiter(0, 8, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)), }, } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ba58c69d944..5449cf42df0 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -225,7 +225,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID))) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQueryFromStore(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 51a86d302f5..1fdd951d6d4 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -6,18 +6,21 @@ import ( "sync" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" ) type queryLimiterCtxKey struct{} var ( - ctxKey = &queryLimiterCtxKey{} - ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)" - ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)" + ctxKey = &queryLimiterCtxKey{} + ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)" + ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)" + ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)" ) type QueryLimiter struct { @@ -25,20 +28,23 @@ type QueryLimiter struct { uniqueSeries map[model.Fingerprint]struct{} chunkBytesCount atomic.Int64 + chunkCount atomic.Int64 maxSeriesPerQuery int maxChunkBytesPerQuery int + maxChunksPerQuery int } // NewQueryLimiter makes a new per-query limiter. Each query limiter // is configured using the `maxSeriesPerQuery` limit. -func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int) *QueryLimiter { +func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQuery int) *QueryLimiter { return &QueryLimiter{ uniqueSeriesMx: sync.Mutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, maxSeriesPerQuery: maxSeriesPerQuery, maxChunkBytesPerQuery: maxChunkBytesPerQuery, + maxChunksPerQuery: maxChunksPerQuery, } } @@ -52,7 +58,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { ql, ok := ctx.Value(ctxKey).(*QueryLimiter) if !ok { // If there's no limiter return a new unlimited limiter as a fallback - ql = NewQueryLimiter(0, 0) + ql = NewQueryLimiter(0, 0, 0) } return ql } @@ -93,3 +99,14 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { } return nil } + +func (ql *QueryLimiter) AddChunks(count int, matchers []*labels.Matcher) error { + if ql.maxChunksPerQuery == 0 { + return nil + } + + if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) { + return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), ql.maxChunksPerQuery)) + } + return nil +} diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 0456a804eed..1440a06e2f0 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -25,7 +25,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing labels.MetricName: metricName + "_2", "series2": "1", }) - limiter = NewQueryLimiter(100, 0) + limiter = NewQueryLimiter(100, 0, 0) ) err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) assert.NoError(t, err) @@ -53,7 +53,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) labels.MetricName: metricName + "_2", "series2": "1", }) - limiter = NewQueryLimiter(1, 0) + limiter = NewQueryLimiter(1, 0, 0) ) err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) require.NoError(t, err) @@ -62,7 +62,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) } func TestQueryLimiter_AddChunkBytes(t *testing.T) { - var limiter = NewQueryLimiter(0, 100) + var limiter = NewQueryLimiter(0, 100, 0) err := limiter.AddChunkBytes(100) require.NoError(t, err) @@ -84,7 +84,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) { } b.ResetTimer() - limiter := NewQueryLimiter(b.N+1, 0) + limiter := NewQueryLimiter(b.N+1, 0, 0) for _, s := range series { err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) assert.NoError(b, err) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 76a1cf0a4a6..d7018aab823 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -151,7 +151,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetricsWithMetadataPerUser, "ingester.max-global-metadata-per-user", 0, "The maximum number of active metrics with metadata per user, across the cluster. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") - f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") + f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable") f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") @@ -398,12 +398,6 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore } -// MaxChunksPerQueryFromIngesters returns the maximum number of chunks allowed per query when fetching -// chunks from ingesters. -func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int { - return o.getOverridesForUser(userID).MaxChunksPerQuery -} - // MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching // chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { From 69451343a905cee2d2b03545e2d51e4821dbb8e1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 4 Jun 2021 13:29:37 -0700 Subject: [PATCH 2/4] Addressing comments Signed-off-by: Alan Protasio --- pkg/distributor/query.go | 3 +- pkg/querier/blocks_store_queryable.go | 26 ++++++++-- pkg/querier/blocks_store_queryable_test.go | 60 +++++++++++++++++++++- pkg/querier/querier.go | 2 +- pkg/util/limiter/query_limiter.go | 8 ++- pkg/util/validation/limits.go | 4 ++ 6 files changed, 88 insertions(+), 15 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index abd69f1da20..8619a071258 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -314,8 +314,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri } // Enforce the max chunks limits. - matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) - if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount(), matchers); chunkLimitErr != nil { + if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { return nil, validation.LimitError(chunkLimitErr.Error()) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 2d1fd41db5d..98cb61e1401 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -54,7 +54,8 @@ const ( ) var ( - errNoStoreGatewayAddress = errors.New("no store-gateway address configured") + errNoStoreGatewayAddress = errors.New("no store-gateway address configured") + errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -402,11 +403,14 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = []storage.SeriesSet(nil) resWarnings = storage.Warnings(nil) + maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) + leftChunksLimit = maxChunksLimit + resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - seriesSets, queriedBlocks, warnings, _, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers) + seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) if err != nil { return nil, err } @@ -416,6 +420,11 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = append(resSeriesSets, seriesSets...) resWarnings = append(resWarnings, warnings...) + // Given a single block is guaranteed to not be queried twice, we can safely decrease the number of + // chunks we can still read before hitting the limit (max == 0 means disabled). + if maxChunksLimit > 0 { + leftChunksLimit -= numChunks + } resultMtx.Unlock() return queriedBlocks, nil @@ -543,6 +552,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( maxT int64, matchers []*labels.Matcher, convertedMatchers []storepb.LabelMatcher, + maxChunksLimit int, + leftChunksLimit int, ) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) @@ -609,10 +620,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). - if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks), matchers); chunkLimitErr != nil { - return validation.LimitError(chunkLimitErr.Error()) + if maxChunksLimit > 0 { + actual := numChunks.Add(int32(len(s.Chunks))) + if actual > int32(leftChunksLimit) { + return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + } } - chunksSize := 0 for _, c := range s.Chunks { chunksSize += c.Size() @@ -620,6 +633,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { return validation.LimitError(chunkBytesLimitErr.Error()) } + if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil { + return validation.LimitError(chunkLimitErr.Error()) + } } if w := resp.GetWarning(); w != "" { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 8b348f7e94c..40e868be4a2 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -451,8 +451,26 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + }, + "max chunks per query limit hit while fetching chunks at first attempt - global limit": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), + mockHintsResponse(block1, block2), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(0, 0, 1), - expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -489,8 +507,46 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + }, + "max chunks per query limit hit while fetching chunks during subsequent attempts - global": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockHintsResponse(block1), + }}: {block1, block3}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), + mockHintsResponse(block2), + }}: {block2, block4}, + }, + // Second attempt returns 1 missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), + mockHintsResponse(block3), + }}: {block3, block4}, + }, + // Third attempt returns the last missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), + mockHintsResponse(block4), + }}: {block4}, + }, + }, + limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(0, 0, 3), - expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)), }, "max series per query limit hit while fetching chunks": { finderResult: bucketindex.Blocks{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5449cf42df0..cb3b35f32e0 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -225,7 +225,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQueryFromStore(userID))) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 1fdd951d6d4..b5c33c62efb 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -6,12 +6,10 @@ import ( "sync" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" ) type queryLimiterCtxKey struct{} @@ -20,7 +18,7 @@ var ( ctxKey = &queryLimiterCtxKey{} ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)" ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)" - ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)" + ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)" ) type QueryLimiter struct { @@ -100,13 +98,13 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { return nil } -func (ql *QueryLimiter) AddChunks(count int, matchers []*labels.Matcher) error { +func (ql *QueryLimiter) AddChunks(count int) error { if ql.maxChunksPerQuery == 0 { return nil } if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) { - return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), ql.maxChunksPerQuery)) + return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery)) } return nil } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index d7018aab823..1355da91d74 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -398,6 +398,10 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore } +func (o *Overrides) MaxChunksPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxChunksPerQuery +} + // MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching // chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { From 1cf48021494a057a1939d0f607a97301ac353fea Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 23 Jun 2021 18:37:24 -0700 Subject: [PATCH 3/4] Addressing comments -2 Signed-off-by: Alan Protasio --- CHANGELOG.md | 4 ++++ docs/configuration/config-file-reference.md | 6 +++--- pkg/util/validation/limits.go | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c346292e3a9..82ddfce2812 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses` * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 * [CHANGE] Ingester: Change default value of `-ingester.active-series-metrics-enabled` to `true`. This incurs a small increase in memory usage, between 1.2% and 1.6% as measured on ingesters with 1.3M active series. #4257 +* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query` * [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179 * [FEATURE] Querier/Ruler: Added new `-querier.max-fetched-chunk-bytes-per-query` flag. When Cortex is running with blocks storage, the max chunk bytes limit is enforced in the querier and ruler and limits the size of all aggregated chunks returned from ingesters and storage as bytes for a query. #4216 * [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163 @@ -59,10 +60,13 @@ * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 * [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246 * [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252 +<<<<<<< HEAD * [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263 * [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271 * [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 * [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218 +======= +>>>>>>> 723e6e847 (Addressing comments - 2) ## Blocksconvert * [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d62d5bb898a..9b20d81bea4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4043,9 +4043,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [max_chunks_per_query: | default = 2000000] # Maximum number of chunks that can be fetched in a single query from ingesters -# and long-term storage. This limit is enforced in the ingester (if chunks -# streaming is enabled), querier, ruler and store-gateway. Takes precedence over -# the deprecated -store.query-chunk-limit. 0 to disable. +# and long-term storage. This limit is enforced in the querier, ruler and +# store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. +# 0 to disable. # CLI flag: -querier.max-fetched-chunks-per-query [max_fetched_chunks_per_query: | default = 0] diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 1355da91d74..506e6f6fffa 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -151,7 +151,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetricsWithMetadataPerUser, "ingester.max-global-metadata-per-user", 0, "The maximum number of active metrics with metadata per user, across the cluster. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.") f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.") - f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") + f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable") f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.") From 0483e5cd8c8d13828a017726b5767f332b37d574 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 24 Jun 2021 09:28:05 -0700 Subject: [PATCH 4/4] Addressing comments - 3 Signed-off-by: Alan Protasio --- CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82ddfce2812..78a65ca7619 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ - `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses` * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 * [CHANGE] Ingester: Change default value of `-ingester.active-series-metrics-enabled` to `true`. This incurs a small increase in memory usage, between 1.2% and 1.6% as measured on ingesters with 1.3M active series. #4257 -* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query` +* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179 * [FEATURE] Querier/Ruler: Added new `-querier.max-fetched-chunk-bytes-per-query` flag. When Cortex is running with blocks storage, the max chunk bytes limit is enforced in the querier and ruler and limits the size of all aggregated chunks returned from ingesters and storage as bytes for a query. #4216 * [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163 @@ -60,13 +60,10 @@ * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 * [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246 * [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252 -<<<<<<< HEAD * [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263 * [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271 * [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 * [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218 -======= ->>>>>>> 723e6e847 (Addressing comments - 2) ## Blocksconvert * [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828