diff --git a/CHANGELOG.md b/CHANGELOG.md index 89b87452dbb..1bf1177932f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,7 +42,9 @@ * [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446 * [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451 * [ENHANCEMENT] Alertmanager: Add the alert name in error log when it get throttled. #5456 +* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481 * [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476 +* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/integration/querier_test.go b/integration/querier_test.go index f5b9d744fdb..acbc9dc97d8 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -297,11 +297,11 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { } require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendInMemory) { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before } - if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendMemcached) { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f72b182489e..180e5c73904 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -451,11 +451,10 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { // to optimize Prometheus query requests. func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { queryAnalyzer := querysharding.NewQueryAnalyzer() - defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. - prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval) + prometheusCodec := queryrange.NewPrometheusCodec(false) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) - shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval) + shardedPrometheusCodec := queryrange.NewPrometheusCodec(true) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -486,7 +485,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantquery.InstantQueryCodec, t.Overrides, queryAnalyzer, - defaultSubQueryInterval, + t.Cfg.Querier.DefaultEvaluationInterval, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index a45da6dbbb3..58733ee79eb 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -108,8 +108,7 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request { type instantQueryCodec struct { tripperware.Codec - now func() time.Time - noStepSubQueryInterval time.Duration + now func() time.Time } func newInstantQueryCodec() instantQueryCodec { @@ -139,10 +138,6 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for } result.Query = r.FormValue("query") - if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil { - return nil, err - } - result.Stats = r.FormValue("stats") result.Path = r.URL.Path diff --git a/pkg/querier/tripperware/instantquery/shard_by_query_test.go b/pkg/querier/tripperware/instantquery/shard_by_query_test.go index 50c83cb5eeb..0d4dfc41a95 100644 --- a/pkg/querier/tripperware/instantquery/shard_by_query_test.go +++ b/pkg/querier/tripperware/instantquery/shard_by_query_test.go @@ -2,7 +2,6 @@ package instantquery import ( "testing" - "time" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" @@ -10,5 +9,5 @@ import ( func Test_shardQuery(t *testing.T) { t.Parallel() - tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute)) + tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true)) } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 704049e66a2..016a2e7ef7a 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -46,15 +46,10 @@ var ( type prometheusCodec struct { sharded bool - - noStepSubQueryInterval time.Duration } -func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive - return &prometheusCodec{ - sharded: sharded, - noStepSubQueryInterval: noStepSubQueryInterval, - } +func NewPrometheusCodec(sharded bool) *prometheusCodec { //nolint:revive + return &prometheusCodec{sharded: sharded} } // WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp. @@ -203,10 +198,6 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa } result.Query = r.FormValue("query") - if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil { - return nil, err - } - result.Stats = r.FormValue("stats") result.Path = r.URL.Path diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 41821695230..5ade2abd522 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -20,8 +20,8 @@ import ( ) var ( - PrometheusCodec = NewPrometheusCodec(false, time.Minute) - ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute) + PrometheusCodec = NewPrometheusCodec(false) + ShardedPrometheusCodec = NewPrometheusCodec(false) ) func TestRoundTrip(t *testing.T) { diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 8951015f00f..dccbde73962 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -61,10 +61,6 @@ func TestRequest(t *testing.T) { url: "api/v1/query_range?start=0&end=11001&step=1", expectedErr: errStepTooSmall, }, - { - url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000), - }, } { tc := tc t.Run(tc.url, func(t *testing.T) { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 3aa6b1313f1..dc0a5c52712 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -142,15 +142,19 @@ func NewQueryTripperware( activeUsers.UpdateUserTimestamp(userStr, time.Now()) queriesPerTenant.WithLabelValues(op, userStr).Inc() - if isQueryRange { - return queryrange.RoundTrip(r) - } else if isQuery { - // If the given query is not shardable, use downstream roundtripper. + if isQuery || isQueryRange { query := r.FormValue("query") // Check subquery step size. if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil { return nil, err } + } + + if isQueryRange { + return queryrange.RoundTrip(r) + } else if isQuery { + // If the given query is not shardable, use downstream roundtripper. + query := r.FormValue("query") // If vertical sharding is not enabled for the tenant, use downstream roundtripper. numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 6d2e3d65c8c..1fc356a4934 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -157,28 +157,37 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) return keys, totalCapacity, nil } -func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) error { +func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) { input := &dynamodb.DeleteItemInput{ TableName: kv.tableName, Key: generateItemKey(key), } - _, err := kv.ddbClient.DeleteItemWithContext(ctx, input) - return err + totalCapacity := float64(0) + output, err := kv.ddbClient.DeleteItemWithContext(ctx, input) + if err != nil { + totalCapacity = getCapacityUnits(output.ConsumedCapacity) + } + return totalCapacity, err } -func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) error { +func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) { input := &dynamodb.PutItemInput{ TableName: kv.tableName, Item: kv.generatePutItemRequest(key, data), } - _, err := kv.ddbClient.PutItemWithContext(ctx, input) - return err + totalCapacity := float64(0) + output, err := kv.ddbClient.PutItemWithContext(ctx, input) + if err != nil { + totalCapacity = getCapacityUnits(output.ConsumedCapacity) + } + return totalCapacity, err } -func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) { + totalCapacity := float64(0) writeRequestSize := len(put) + len(delete) if writeRequestSize == 0 { - return nil + return totalCapacity, nil } writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) @@ -220,15 +229,18 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) if err != nil { - return err + return totalCapacity, err + } + for _, consumedCapacity := range resp.ConsumedCapacity { + totalCapacity += getCapacityUnits(consumedCapacity) } if resp.UnprocessedItems != nil && len(resp.UnprocessedItems) > 0 { - return fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) + return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) } } - return nil + return totalCapacity, nil } func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue { diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index b7e04c1164b..d69ef5707de 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -23,7 +23,7 @@ func Test_TTLDisabled(t *testing.T) { } ddb := newDynamodbClientMock("TEST", ddbClientMock, 0) - err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) + _, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) require.NoError(t, err) } @@ -41,7 +41,7 @@ func Test_TTL(t *testing.T) { } ddb := newDynamodbClientMock("TEST", ddbClientMock, 5*time.Hour) - err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) + _, err := ddb.Put(context.TODO(), dynamodbKey{primaryKey: "test", sortKey: "test1"}, []byte("TEST")) require.NoError(t, err) } @@ -72,7 +72,7 @@ func Test_Batch(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), update, delete) + _, err := ddb.Batch(context.TODO(), update, delete) require.NoError(t, err) } @@ -120,7 +120,7 @@ func Test_BatchSlices(t *testing.T) { delete = append(delete, ddbKeyDelete) } - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.NoError(t, err) require.EqualValues(t, tc.expectedCalls, numOfCalls) @@ -134,7 +134,7 @@ func Test_EmptyBatch(t *testing.T) { ddbClientMock := &mockDynamodb{} ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, nil) + _, err := ddb.Batch(context.TODO(), nil, nil) require.NoError(t, err) } @@ -159,7 +159,7 @@ func Test_Batch_UnprocessedItems(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.Errorf(t, err, "error processing batch dynamodb") } @@ -178,7 +178,7 @@ func Test_Batch_Error(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - err := ddb.Batch(context.TODO(), nil, delete) + _, err := ddb.Batch(context.TODO(), nil, delete) require.Errorf(t, err, "mocked error") } diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index d47f2fe3929..356e58a3581 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -29,8 +29,8 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { }, []string{"operation", "status_code"})) dynamodbUsageMetrics := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "dynamodb_kv_read_capacity_total", - Help: "Total used read capacity on dynamodb", + Name: "dynamodb_kv_consumed_capacity_total", + Help: "Total consumed capacity on dynamodb", }, []string{"operation"}) dynamodbMetrics := dynamodbMetrics{ @@ -66,19 +66,25 @@ func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isP func (d dynamodbInstrumentation) Delete(ctx context.Context, key dynamodbKey) error { return instrument.CollectedRequest(ctx, "Delete", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Delete(ctx, key) + totalCapacity, err := d.kv.Delete(ctx, key) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Delete").Add(totalCapacity) + return err }) } func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data []byte) error { return instrument.CollectedRequest(ctx, "Put", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Put(ctx, key, data) + totalCapacity, err := d.kv.Put(ctx, key, data) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Put").Add(totalCapacity) + return err }) } func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - return d.kv.Batch(ctx, put, delete) + totalCapacity, err := d.kv.Batch(ctx, put, delete) + d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) + return err }) } diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 73342e2daa4..5283eedd664 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -30,18 +30,35 @@ func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []b func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { misses = keys hits = map[labels.Label][]byte{} - for _, c := range m.caches { - h, m := c.FetchMultiPostings(ctx, blockID, misses) - misses = m + backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{} + for i, c := range m.caches { + backfillMap[c] = []map[labels.Label][]byte{} + h, mi := c.FetchMultiPostings(ctx, blockID, misses) + misses = mi for label, bytes := range h { hits[label] = bytes } + + if i > 0 { + backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + } + if len(misses) == 0 { break } } + defer func() { + for cache, hit := range backfillMap { + for _, values := range hit { + for l, b := range values { + cache.StorePostings(blockID, l, b) + } + } + } + }() + return hits, misses } @@ -59,8 +76,11 @@ func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*l } func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - for _, c := range m.caches { + for i, c := range m.caches { if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h { + if i > 0 { + m.caches[i-1].StoreExpandedPostings(blockID, matchers, d) + } return d, h } } @@ -84,18 +104,36 @@ func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { misses = ids hits = map[storage.SeriesRef][]byte{} - for _, c := range m.caches { - h, m := c.FetchMultiSeries(ctx, blockID, misses) - misses = m + backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{} + + for i, c := range m.caches { + backfillMap[c] = []map[storage.SeriesRef][]byte{} + h, miss := c.FetchMultiSeries(ctx, blockID, misses) + misses = miss for label, bytes := range h { hits[label] = bytes } + + if i > 0 && len(h) > 0 { + backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h) + } + if len(misses) == 0 { break } } + defer func() { + for cache, hit := range backfillMap { + for _, values := range hit { + for m, b := range values { + cache.StoreSeries(blockID, m, b) + } + } + } + }() + return hits, misses } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index e5fcdb13533..749da9da5b5 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -148,9 +148,10 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) }, }, - "[FetchMultiPostings] should fallback only the missing keys on l1": { + "[FetchMultiPostings] should fallback and backfill only the missing keys on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, + "StorePostings": {{bID, l2, v}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l2}}}, @@ -158,6 +159,9 @@ func Test_MultiLevelCache(t *testing.T) { m1MockedCalls: map[string][]interface{}{ "FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1)}, []labels.Label{l2}}, }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiPostings": {map[labels.Label][]byte{l2: v}, []labels.Label{}}, + }, call: func(cache storecache.IndexCache) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}) }, @@ -185,15 +189,19 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) }, }, - "[FetchMultiSeries] should fallback only the missing keys on l1": { + "[FetchMultiSeries] should fallback and backfill only the missing keys on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}}, + "StoreSeries": {{bID, storage.SeriesRef(2), v}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{2}}}, }, m1MockedCalls: map[string][]interface{}{ - "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: make([]byte, 1)}, []storage.SeriesRef{2}}, + "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2}}, + }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}}, }, call: func(cache storecache.IndexCache) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) @@ -211,13 +219,17 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}) }, }, - "[FetchExpandedPostings] Should fallback when miss": { + "[FetchExpandedPostings] Should fallback and backfill when miss": { m1ExpectedCalls: map[string][][]interface{}{ + "StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}}, "FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}}, }, m2ExpectedCalls: map[string][][]interface{}{ "FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}}, }, + m2MockedCalls: map[string][]interface{}{ + "FetchExpandedPostings": {v, true}, + }, call: func(cache storecache.IndexCache) { cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}) },