Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43
github.com/uber/jaeger-client-go v2.30.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,8 +1160,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd h1:asQ0HomkaUXZuR3J7daBEusMS++3hkYsYM6u8gpmPWM=
github.com/thanos-io/objstore v0.0.0-20230522103316-23ebe2eacadd/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/thanos-io/thanos v0.31.1-0.20230627154113-7cfaf3fe2d43 h1:UHyTPGdDHAoNHuSce5cJ2vEi6g1v8D5ZFBWZ61uTHSM=
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// Give up cleaning if we get access denied
level.Warn(userLogger).Log("msg", err.Error())
return nil
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
return err
}
Expand Down
49 changes: 32 additions & 17 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package compactor
import (
"context"
"crypto/rand"
"errors"
"fmt"
"path"
"strings"
Expand All @@ -17,14 +16,12 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -57,6 +54,37 @@ func TestBlocksCleaner(t *testing.T) {
}
}

func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"

bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

// Create blocks.
ctx := context.Background()
deletionDelay := 12 * time.Hour
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
}

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
err := cleaner.cleanUser(ctx, userID, true)
require.NoError(t, err)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)

Expand Down Expand Up @@ -254,7 +282,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
createDeletionMark(t, bucketClient, userID, block4, now.Add(-deletionDelay).Add(-time.Hour))

// To emulate a failure deleting a block, we wrap the bucket client in a mocked one.
bucketClient = &mockBucketFailure{
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
DeleteFailures: []string{path.Join(userID, block3.String(), metadata.MetaFilename)},
}
Expand Down Expand Up @@ -658,19 +686,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
}
}

type mockBucketFailure struct {
objstore.Bucket

DeleteFailures []string
}

func (m *mockBucketFailure) Delete(ctx context.Context, name string) error {
if util.StringsContain(m.DeleteFailures, name) {
return errors.New("mocked delete failure")
}
return m.Bucket.Delete(ctx, name)
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -62,6 +64,11 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,
// so the bucket index hasn't been created yet.
return nil, nil, nil
}

if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
return nil, nil, validation.AccessDeniedError(err.Error())
}

if err != nil {
return nil, nil, err
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/blocks_finder_bucket_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -241,3 +243,21 @@ func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIn

return finder
}

func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

bkt = &cortex_testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
}

finder := prepareBucketIndexBlocksFinder(t, bkt)

_, _, err := finder.GetBlocks(context.Background(), userID, 0, 100)
expected := validation.AccessDeniedError("error")
require.IsType(t, expected, err)
}
3 changes: 3 additions & 0 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func TranslateToPromqlAPIError(err error) error {
case validation.LimitError:
// This will be returned with status code 422 by Prometheus API.
return err
case validation.AccessDeniedError:
// This will be returned with status code 422 by Prometheus API.
return err
default:
if errors.Is(err, context.Canceled) {
return err // 422
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func TestApiStatusCodes(t *testing.T) {
expectedCode: 422,
},

{
err: validation.AccessDeniedError("access denied"),
expectedString: "access denied",
expectedCode: 422,
},

{
err: promql.ErrTooManySamples("query execution"),
expectedString: "too many samples",
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")

ErrCustomerManagedKeyAccessDenied = errors.New("access denied: customer key")
)

// Config holds configuration for accessing long-term storage.
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/bucket/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"github.com/thanos-io/objstore"
)

var errObjectDoesNotExist = errors.New("object does not exist")
var (
errObjectDoesNotExist = errors.New("object does not exist")
errKeyPermissionDenied = errors.New("object key permission denied")
)

// ClientMock mocks objstore.Bucket
type ClientMock struct {
Expand Down Expand Up @@ -175,6 +178,11 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool {
return err == errObjectDoesNotExist
}

// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError()
func (m *ClientMock) IsCustomerManagedKeyError(err error) bool {
return err == errKeyPermissionDenied
}

// ObjectSize mocks objstore.Bucket.Attributes()
func (m *ClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
args := m.Called(ctx, name)
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/bucket/prefixed_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,23 @@ func (b *PrefixedBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked.
func (b *PrefixedBucketClient) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

// Attributes returns attributes of the specified object.
func (b *PrefixedBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, b.fullName(name))
}

// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *PrefixedBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.WithExpectedErrs(fn)
}

// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *PrefixedBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation
if lastErr == nil {
return nil
}
if b.bucket.IsObjNotFoundErr(lastErr) {
if b.bucket.IsObjNotFoundErr(lastErr) || b.bucket.IsCustomerManagedKeyError(lastErr) {
return lastErr
}
retries.Wait()
Expand Down Expand Up @@ -194,6 +194,10 @@ func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

func (b *BucketWithRetries) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

func (b *BucketWithRetries) Close() error {
return b.bucket.Close()
}
57 changes: 55 additions & 2 deletions pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"testing"
Expand All @@ -13,6 +14,49 @@ import (
"github.com/thanos-io/objstore"
)

var (
errNotFound = errors.New("not found")
errKeyDenied = errors.New("key denied")
)

func TestBucketWithRetries_ShouldRetry(t *testing.T) {
t.Parallel()

cases := map[string]struct {
err error
retryCount int
}{
"should not retry on not found": {
err: errNotFound,
retryCount: 1,
},
"should not retry on key access denied": {
err: errKeyDenied,
retryCount: 1,
},
}

for name, tc := range cases {
t.Run(name, func(*testing.T) {
m := mockBucket{
FailCount: 3,
errToReturn: tc.err,
}

b := BucketWithRetries{
logger: log.NewNopLogger(),
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

_, _ = b.Get(context.Background(), "something")
require.Equal(t, 1, m.calledCount)
})
}
}

func TestBucketWithRetries_UploadSeekable(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -102,6 +146,9 @@ func (f *fakeReader) Read(p []byte) (n int, err error) {
type mockBucket struct {
FailCount int
uploadedContent []byte
errToReturn error

calledCount int
}

// Upload mocks objstore.Bucket.Upload()
Expand Down Expand Up @@ -135,7 +182,8 @@ func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error,

// Get mocks objstore.Bucket.Get()
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return nil, nil
m.calledCount++
return nil, m.errToReturn
}

// GetRange mocks objstore.Bucket.GetRange()
Expand All @@ -150,7 +198,12 @@ func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) {

// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr()
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
return false
return err == errNotFound
}

// IsCustomerManagedKeyError mocks objstore.Bucket.IsCustomerManagedKeyError()
func (m *mockBucket) IsCustomerManagedKeyError(err error) bool {
return err == errKeyDenied
}

// ObjectSize mocks objstore.Bucket.Attributes()
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

func (b *SSEBucketClient) IsCustomerManagedKeyError(err error) bool {
return b.bucket.IsCustomerManagedKeyError(err)
}

// Attributes implements objstore.Bucket.
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, name)
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/tsdb/bucketindex/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {

if errors.Is(err, ErrIndexNotFound) {
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID)
} else {
// We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just
// started to remote write and its blocks haven't uploaded to storage yet).
Expand Down Expand Up @@ -196,7 +198,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) {
l.loadAttempts.Inc()
startTime := time.Now()
idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger)
if err != nil && !errors.Is(err, ErrIndexNotFound) {
if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
l.loadFailures.Inc()
level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err)
return
Expand Down
Loading