diff --git a/CHANGELOG.md b/CHANGELOG.md index 6119b552e63..c7c09eac791 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ * [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404 * [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401 * [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430 +* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 27e120c80b3..c355be998cc 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -648,6 +648,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if s.Code() == codes.ResourceExhausted { return validation.LimitError(s.Message()) } + + if s.Code() == codes.PermissionDenied { + return validation.AccessDeniedError(s.Message()) + } } return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } @@ -816,6 +820,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( return validation.LimitError(s.Message()) } } + + if s.Code() == codes.PermissionDenied { + return validation.AccessDeniedError(s.Message()) + } return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress()) } @@ -907,6 +915,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( if s.Code() == codes.ResourceExhausted { return validation.LimitError(s.Message()) } + + if s.Code() == codes.PermissionDenied { + return validation.AccessDeniedError(s.Message()) + } } return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress()) } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 5dcfd03d37e..c4d925d4caf 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -638,6 +638,44 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "all store-gateways return PermissionDenied": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + expectedErr: validation.AccessDeniedError("PermissionDenied"), + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }, + mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "2.2.2.2", + mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }, + mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"), + }: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, "multiple store-gateways has the block, but one of them fails to return on stream": { finderResult: bucketindex.Blocks{ {ID: block1}, diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index 36c1678c160..613756a7873 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -4,10 +4,14 @@ import ( "context" "io" + "github.com/gogo/status" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/pkg/errors" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/s3" + "google.golang.org/grpc/codes" + + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3" ) @@ -101,12 +105,24 @@ func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) e // Get implements objstore.Bucket. func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return b.bucket.Get(ctx, name) + r, err := b.bucket.Get(ctx, name) + + if err != nil && b.IsCustomerManagedKeyError(err) { + // Store gateway will return the status if the returned error is an `status.Error` + return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error())) + } + + return r, err } // GetRange implements objstore.Bucket. func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - return b.bucket.GetRange(ctx, name, off, length) + r, err := b.bucket.GetRange(ctx, name, off, length) + if err != nil && b.IsCustomerManagedKeyError(err) { + return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error())) + } + + return r, err } // Exists implements objstore.Bucket. @@ -119,7 +135,12 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool { return b.bucket.IsObjNotFoundErr(err) } +// IsCustomerManagedKeyError implements objstore.Bucket. func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool { + // unwrap error + if se, ok := err.(interface{ Err() error }); ok { + return b.bucket.IsCustomerManagedKeyError(se.Err()) || b.bucket.IsCustomerManagedKeyError(err) + } return b.bucket.IsCustomerManagedKeyError(err) } diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go index 6b99af304c7..45431dce8a2 100644 --- a/pkg/storage/bucket/sse_bucket_client_test.go +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -106,6 +106,19 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { } } +func Test_shouldWrapSSeErrors(t *testing.T) { + cfgProvider := &mockTenantConfigProvider{} + + bkt := &ClientMock{} + + bkt.MockGet("Test", "someContent", errKeyPermissionDenied) + + sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider) + + _, err := sseBkt.Get(context.Background(), "Test") + require.True(t, sseBkt.IsCustomerManagedKeyError(err)) +} + type mockTenantConfigProvider struct { s3SseType string s3KmsKeyID string diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 1962c07b554..353fba0f10e 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -51,6 +51,20 @@ func (m *MockBucketFailure) Delete(ctx context.Context, name string) error { return m.Bucket.Delete(ctx, name) } +func (m *MockBucketFailure) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + m.GetCalls.Add(1) + for prefix, err := range m.GetFailures { + if strings.HasPrefix(name, prefix) { + return nil, err + } + } + if e, ok := m.GetFailures[name]; ok { + return nil, e + } + + return m.Bucket.GetRange(ctx, name, off, length) +} + func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) { m.GetCalls.Add(1) for prefix, err := range m.GetFailures { @@ -96,5 +110,5 @@ func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFai } func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool { - return errors.Is(err, ErrKeyAccessDeniedError) + return errors.Is(errors.Cause(err), ErrKeyAccessDeniedError) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 287fe4d0d26..1a3b791445c 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -299,14 +299,15 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri } store := u.getStore(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return nil } err := u.getStoreError(userID) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { + return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) } err = store.Series(req, spanSeriesServer{ @@ -314,10 +315,6 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri ctx: spanCtx, }) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) - } - return err } @@ -332,22 +329,19 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } store := u.getStore(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return &storepb.LabelNamesResponse{}, nil } err := u.getStoreError(userID) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) } resp, err := store.LabelNames(ctx, req) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) - } - return resp, err } @@ -362,22 +356,19 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues } store := u.getStore(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return &storepb.LabelValuesResponse{}, nil } err := u.getStoreError(userID) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) + if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) } resp, err := store.LabelValues(ctx, req) - if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) { - return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err) - } - return resp, err } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index d7585941163..4e82b885253 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -2,11 +2,13 @@ package storegateway import ( "context" + "encoding/json" "errors" "fmt" "io" "math" "os" + "path" "path/filepath" "sort" "strings" @@ -14,6 +16,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/gogo/status" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -29,12 +32,13 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -46,8 +50,8 @@ import ( func TestBucketStores_CustomerKeyError(t *testing.T) { userToMetric := map[string]string{ - "user-1": "series_1", - "user-2": "series_2", + "user-1": "series", + "user-2": "series", } ctx := context.Background() @@ -62,52 +66,116 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { b, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) - mBucket := &cortex_testutil.MockBucketFailure{ - Bucket: b, - GetFailures: map[string]error{ - "user-1": cortex_testutil.ErrKeyAccessDeniedError, + bucketIndexes := map[string]*bucketindex.Index{} + // Generate Bucket Index + for userID := range userToMetric { + idx := &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + UpdatedAt: time.Now().Unix(), + } + err := b.Iter(ctx, userID, func(s string) error { + if id, isBlock := block.IsBlockDir(s); isBlock { + metaFile := path.Join(userID, id.String(), block.MetaFilename) + r, err := b.Get(ctx, metaFile) + require.NoError(t, err) + metaContent, err := io.ReadAll(r) + require.NoError(t, err) + // Unmarshal it. + m := thanos_metadata.Meta{} + if err := json.Unmarshal(metaContent, &m); err != nil { + require.NoError(t, err) + } + + idx.Blocks = append(idx.Blocks, bucketindex.BlockFromThanosMeta(m)) + } + return nil + }) + + require.NoError(t, err) + require.NoError(t, bucketindex.WriteIndex(ctx, b, userID, nil, idx)) + bucketIndexes[userID] = idx + } + + cases := map[string]struct { + mockInitialSync bool + GetFailures map[string]error + }{ + "should return ResourceExhausted when fail to get bucket index": { + mockInitialSync: true, + GetFailures: map[string]error{ + "user-1/bucket-index.json.gz": cortex_testutil.ErrKeyAccessDeniedError, + }, + }, + "should return ResourceExhausted when fail to block index": { + mockInitialSync: false, + GetFailures: map[string]error{ + "user-1/" + bucketIndexes["user-1"].Blocks[0].ID.String() + "/index": cortex_testutil.ErrKeyAccessDeniedError, + }, }, } - require.NoError(t, err) - reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) - require.NoError(t, err) + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + mBucket := &cortex_testutil.MockBucketFailure{ + Bucket: b, + } + require.NoError(t, err) - // Should set the error on user-1 - require.NoError(t, stores.InitialSync(ctx)) - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) - require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) - - _, _, err = querySeries(stores, "user-1", "anything", 0, 100) - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) - _, err = queryLabelsNames(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) - _, err = queryLabelsValues(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) - _, _, err = querySeries(stores, "user-2", "anything", 0, 100) - require.NoError(t, err) - _, err = queryLabelsNames(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) - _, err = queryLabelsValues(stores, "user-1", "anything") - require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyAccessDenied)) - - // Cleaning the error - mBucket.GetFailures = map[string]error{} - require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], nil) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) - _, _, err = querySeries(stores, "user-1", "anything", 0, 100) - require.NoError(t, err) - _, _, err = querySeries(stores, "user-2", "anything", 0, 100) - require.NoError(t, err) - _, err = queryLabelsNames(stores, "user-1", "anything") - require.NoError(t, err) - _, err = queryLabelsValues(stores, "user-1", "anything") - require.NoError(t, err) + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + + if tc.mockInitialSync { + mBucket.GetFailures = tc.GetFailures + } + + // Should set the error on user-1 + require.NoError(t, stores.InitialSync(ctx)) + if tc.mockInitialSync { + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + } + require.NoError(t, stores.SyncBlocks(context.Background())) + if tc.mockInitialSync { + require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + } + + mBucket.GetFailures = tc.GetFailures + + _, _, err = querySeries(stores, "user-1", "series", 0, 100) + s, _ := status.FromError(err) + require.Equal(t, codes.PermissionDenied, s.Code()) + _, err = queryLabelsNames(stores, "user-1", "series", 0, 100) + s, _ = status.FromError(err) + require.Equal(t, codes.PermissionDenied, s.Code()) + _, err = queryLabelsValues(stores, "user-1", "__name__", "series", 0, 100) + s, _ = status.FromError(err) + require.Equal(t, codes.PermissionDenied, s.Code()) + _, _, err = querySeries(stores, "user-2", "series", 0, 100) + require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "series", 0, 100) + s, _ = status.FromError(err) + require.Equal(t, codes.PermissionDenied, s.Code()) + _, err = queryLabelsValues(stores, "user-1", "__name__", "series", 0, 100) + s, _ = status.FromError(err) + require.Equal(t, codes.PermissionDenied, s.Code()) + + // Cleaning the error + mBucket.GetFailures = map[string]error{} + require.NoError(t, stores.SyncBlocks(context.Background())) + require.ErrorIs(t, stores.storesErrors["user-1"], nil) + require.ErrorIs(t, stores.storesErrors["user-2"], nil) + _, _, err = querySeries(stores, "user-1", "series", 0, 100) + require.NoError(t, err) + _, _, err = querySeries(stores, "user-2", "series", 0, 100) + require.NoError(t, err) + _, err = queryLabelsNames(stores, "user-1", "series", 0, 100) + require.NoError(t, err) + _, err = queryLabelsValues(stores, "user-1", "__name__", "series", 0, 100) + require.NoError(t, err) + }) + } } func TestBucketStores_InitialSync(t *testing.T) { @@ -499,8 +567,10 @@ func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int return srv.SeriesSet, srv.Warnings, err } -func queryLabelsNames(stores *BucketStores, userID, metricName string) (*storepb.LabelNamesResponse, error) { +func queryLabelsNames(stores *BucketStores, userID, metricName string, start, end int64) (*storepb.LabelNamesResponse, error) { req := &storepb.LabelNamesRequest{ + Start: start, + End: end, Matchers: []storepb.LabelMatcher{{ Type: storepb.LabelMatcher_EQ, Name: labels.MetricName, @@ -513,8 +583,11 @@ func queryLabelsNames(stores *BucketStores, userID, metricName string) (*storepb return stores.LabelNames(ctx, req) } -func queryLabelsValues(stores *BucketStores, userID, metricName string) (*storepb.LabelValuesResponse, error) { +func queryLabelsValues(stores *BucketStores, userID, labelName, metricName string, start, end int64) (*storepb.LabelValuesResponse, error) { req := &storepb.LabelValuesRequest{ + Start: start, + End: end, + Label: labelName, Matchers: []storepb.LabelMatcher{{ Type: storepb.LabelMatcher_EQ, Name: labels.MetricName, diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index d5c19f363bf..141462b531e 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -26,6 +26,11 @@ func (e errWithCause) Unwrap() error { return e.cause } +// Err return the original error +func (e errWithCause) Err() error { + return e.error +} + // WithCause wrappers err with a error cause func WithCause(err, cause error) error { return errWithCause{ diff --git a/pkg/util/errors/errors_test.go b/pkg/util/errors/errors_test.go new file mode 100644 index 00000000000..32255d43b95 --- /dev/null +++ b/pkg/util/errors/errors_test.go @@ -0,0 +1,41 @@ +package errors + +import ( + "testing" + + "github.com/pkg/errors" +) + +var ( + errToTest = errors.New("err") +) + +func TestErrorIs(t *testing.T) { + type args struct { + err error + f func(err error) bool + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "should unwrap error", + want: true, + args: args{ + err: errors.Wrap(errors.Wrap(errToTest, "outer1"), "outer2"), + f: func(err error) bool { + return err == errToTest + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ErrorIs(tt.args.err, tt.args.f); got != tt.want { + t.Errorf("ErrorIs() = %v, want %v", got, tt.want) + } + }) + } +}