@@ -29,8 +29,11 @@ import (
2929 "github.com/thanos-io/thanos/pkg/store/storepb"
3030 "github.com/weaveworks/common/httpgrpc"
3131 "github.com/weaveworks/common/logging"
32+ "google.golang.org/grpc/codes"
3233 "google.golang.org/grpc/metadata"
3334
35+ "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
36+
3437 "github.com/cortexproject/cortex/pkg/storage/bucket"
3538 "github.com/cortexproject/cortex/pkg/storage/tsdb"
3639 "github.com/cortexproject/cortex/pkg/util/backoff"
@@ -64,7 +67,7 @@ type BucketStores struct {
6467
6568 // Keeps a bucket store for each tenant.
6669 storesMu sync.RWMutex
67- stores map [string ]* store. BucketStore
70+ stores map [string ]* BucketStoreWithLastError
6871
6972 // Metrics.
7073 syncTimes prometheus.Histogram
@@ -73,6 +76,11 @@ type BucketStores struct {
7376 tenantsSynced prometheus.Gauge
7477}
7578
79+ type BucketStoreWithLastError struct {
80+ * store.BucketStore
81+ err error
82+ }
83+
7684// NewBucketStores makes a new BucketStores.
7785func NewBucketStores (cfg tsdb.BlocksStorageConfig , shardingStrategy ShardingStrategy , bucketClient objstore.Bucket , limits * validation.Overrides , logLevel logging.Level , logger log.Logger , reg prometheus.Registerer ) (* BucketStores , error ) {
7886 cachingBucket , err := tsdb .CreateCachingBucket (cfg .BucketStore .ChunksCache , cfg .BucketStore .MetadataCache , bucketClient , logger , reg )
@@ -94,7 +102,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
94102 limits : limits ,
95103 bucket : cachingBucket ,
96104 shardingStrategy : shardingStrategy ,
97- stores : map [string ]* store. BucketStore {},
105+ stores : map [string ]* BucketStoreWithLastError {},
98106 logLevel : logLevel ,
99107 bucketStoreMetrics : NewBucketStoreMetrics (),
100108 metaFetcherMetrics : NewMetadataFetcherMetrics (),
@@ -192,7 +200,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
192200
193201 type job struct {
194202 userID string
195- store * store. BucketStore
203+ store * BucketStoreWithLastError
196204 }
197205
198206 wg := & sync.WaitGroup {}
@@ -225,10 +233,16 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
225233 defer wg .Done ()
226234
227235 for job := range jobs {
228- if err := f (ctx , job .store ); err != nil {
229- errsMx .Lock ()
230- errs .Add (errors .Wrapf (err , "failed to synchronize TSDB blocks for user %s" , job .userID ))
231- errsMx .Unlock ()
236+ if err := f (ctx , job .store .BucketStore ); err != nil {
237+ if errors .Is (err , bucketindex .ErrCustomerManagedKeyError ) {
238+ job .store .err = err
239+ } else {
240+ errsMx .Lock ()
241+ errs .Add (errors .Wrapf (err , "failed to synchronize TSDB blocks for user %s" , job .userID ))
242+ errsMx .Unlock ()
243+ }
244+ } else {
245+ job .store .err = nil
232246 }
233247 }
234248 }()
@@ -286,10 +300,20 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
286300 return nil
287301 }
288302
289- return store .Series (req , spanSeriesServer {
303+ if store .err != nil && errors .Is (store .err , bucketindex .ErrCustomerManagedKeyError ) {
304+ return httpgrpc .Errorf (int (codes .ResourceExhausted ), "store error: %s" , store .err )
305+ }
306+
307+ err := store .Series (req , spanSeriesServer {
290308 Store_SeriesServer : srv ,
291309 ctx : spanCtx ,
292310 })
311+
312+ if err != nil && errors .Is (err , bucketindex .ErrCustomerManagedKeyError ) {
313+ return httpgrpc .Errorf (int (codes .ResourceExhausted ), "store error: %s" , err )
314+ }
315+
316+ return err
293317}
294318
295319// LabelNames implements the Storegateway proto service.
@@ -344,7 +368,7 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
344368 return users , err
345369}
346370
347- func (u * BucketStores ) getStore (userID string ) * store. BucketStore {
371+ func (u * BucketStores ) getStore (userID string ) * BucketStoreWithLastError {
348372 u .storesMu .RLock ()
349373 defer u .storesMu .RUnlock ()
350374 return u .stores [userID ]
@@ -387,7 +411,7 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
387411 return bs .Close ()
388412}
389413
390- func isEmptyBucketStore (bs * store. BucketStore ) bool {
414+ func isEmptyBucketStore (bs * BucketStoreWithLastError ) bool {
391415 min , max := bs .TimeRange ()
392416 return min == math .MaxInt64 && max == math .MinInt64
393417}
@@ -396,7 +420,7 @@ func (u *BucketStores) syncDirForUser(userID string) string {
396420 return filepath .Join (u .cfg .BucketStore .SyncDir , userID )
397421}
398422
399- func (u * BucketStores ) getOrCreateStore (userID string ) (* store. BucketStore , error ) {
423+ func (u * BucketStores ) getOrCreateStore (userID string ) (* BucketStoreWithLastError , error ) {
400424 // Check if the store already exists.
401425 bs := u .getStore (userID )
402426 if bs != nil {
@@ -500,7 +524,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
500524 bucketStoreOpts = append (bucketStoreOpts , store .WithDebugLogging ())
501525 }
502526
503- bs , err := store .NewBucketStore (
527+ s , err := store .NewBucketStore (
504528 userBkt ,
505529 fetcher ,
506530 u .syncDirForUser (userID ),
@@ -520,6 +544,10 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
520544 return nil , err
521545 }
522546
547+ bs = & BucketStoreWithLastError {
548+ BucketStore : s ,
549+ }
550+
523551 u .stores [userID ] = bs
524552 u .metaFetcherMetrics .AddUserRegistry (userID , fetcherReg )
525553 u .bucketStoreMetrics .AddUserRegistry (userID , bucketStoreReg )
0 commit comments