Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
12 changes: 12 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if s.Code() == codes.ResourceExhausted {
return validation.LimitError(s.Message())
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
}
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
}
Expand Down Expand Up @@ -816,6 +820,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
return validation.LimitError(s.Message())
}
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
}

Expand Down Expand Up @@ -907,6 +915,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
if s.Code() == codes.ResourceExhausted {
return validation.LimitError(s.Message())
}

if s.Code() == codes.PermissionDenied {
return validation.AccessDeniedError(s.Message())
}
}
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,44 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"all store-gateways return PermissionDenied": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
expectedErr: validation.AccessDeniedError("PermissionDenied"),
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
},
mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"),
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "2.2.2.2",
mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
},
mockedSeriesStreamErr: status.Error(codes.PermissionDenied, "PermissionDenied"),
}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{
{t: minT, v: 2},
},
},
},
},
"multiple store-gateways has the block, but one of them fails to return on stream": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down
25 changes: 23 additions & 2 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"
"io"

"github.com/gogo/status"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/s3"
"google.golang.org/grpc/codes"

cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"

cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
)
Expand Down Expand Up @@ -101,12 +105,24 @@ func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) e

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
r, err := b.bucket.Get(ctx, name)

if err != nil && b.IsCustomerManagedKeyError(err) {
// Store gateway will return the status if the returned error is an `status.Error`
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}

return r, err
}

// GetRange implements objstore.Bucket.
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bucket.GetRange(ctx, name, off, length)
r, err := b.bucket.GetRange(ctx, name, off, length)
if err != nil && b.IsCustomerManagedKeyError(err) {
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}

return r, err
}

// Exists implements objstore.Bucket.
Expand All @@ -119,7 +135,12 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError implements objstore.Bucket.
func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool {
// unwrap error
if se, ok := err.(interface{ Err() error }); ok {
return b.bucket.IsCustomerManagedKeyError(se.Err()) || b.bucket.IsCustomerManagedKeyError(err)
}
return b.bucket.IsCustomerManagedKeyError(err)
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
}
}

func Test_shouldWrapSSeErrors(t *testing.T) {
cfgProvider := &mockTenantConfigProvider{}

bkt := &ClientMock{}

bkt.MockGet("Test", "someContent", errKeyPermissionDenied)

sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider)

_, err := sseBkt.Get(context.Background(), "Test")
require.True(t, sseBkt.IsCustomerManagedKeyError(err))
}

type mockTenantConfigProvider struct {
s3SseType string
s3KmsKeyID string
Expand Down
16 changes: 15 additions & 1 deletion pkg/storage/tsdb/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ func (m *MockBucketFailure) Delete(ctx context.Context, name string) error {
return m.Bucket.Delete(ctx, name)
}

func (m *MockBucketFailure) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
m.GetCalls.Add(1)
for prefix, err := range m.GetFailures {
if strings.HasPrefix(name, prefix) {
return nil, err
}
}
if e, ok := m.GetFailures[name]; ok {
return nil, e
}

return m.Bucket.GetRange(ctx, name, off, length)
}

func (m *MockBucketFailure) Get(ctx context.Context, name string) (io.ReadCloser, error) {
m.GetCalls.Add(1)
for prefix, err := range m.GetFailures {
Expand Down Expand Up @@ -96,5 +110,5 @@ func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFai
}

func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool {
return errors.Is(err, ErrKeyAccessDeniedError)
return errors.Is(errors.Cause(err), ErrKeyAccessDeniedError)
}
27 changes: 9 additions & 18 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,25 +299,22 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are initializing a new bkt client every Series request? Can we just use u.bucket here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do this to unwrap the error, but creating this here is just creating an object (it does not do anything).

if store == nil {
return nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
})

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return err
}

Expand All @@ -332,22 +329,19 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
if store == nil {
return &storepb.LabelNamesResponse{}, nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

resp, err := store.LabelNames(ctx, req)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return resp, err
}

Expand All @@ -362,22 +356,19 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues
}

store := u.getStore(userID)
userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
if store == nil {
return &storepb.LabelValuesResponse{}, nil
}

err := u.getStoreError(userID)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) {
return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
}

resp, err := store.LabelValues(ctx, req)

if err != nil && cortex_errors.ErrorIs(err, u.bucket.IsCustomerManagedKeyError) {
return resp, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
}

return resp, err
}

Expand Down
Loading