Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/alertspb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package alertspb
import "errors"

var (
ErrNotFound = errors.New("alertmanager storage object not found")
ErrNotFound = errors.New("alertmanager storage object not found")
ErrAccessDenied = errors.New("alertmanager storage object access denied")
)

// ToProto transforms a yaml Alertmanager config and map of template files to an AlertConfigDesc
Expand Down
30 changes: 21 additions & 9 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/pkg/errors"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/concurrency"
Expand Down Expand Up @@ -76,8 +78,8 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string
err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error {
userID := job.(string)

cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
cfg, uBucket, err := s.getAlertConfig(ctx, userID)
if uBucket.IsObjNotFoundErr(err) || uBucket.IsAccessDeniedErr(err) {
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to fetch alertmanager config for user %s", userID)
Expand All @@ -95,11 +97,15 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string

// GetAlertConfig implements alertstore.AlertStore.
func (s *BucketAlertStore) GetAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, error) {
cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
cfg, uBucket, err := s.getAlertConfig(ctx, userID)
if uBucket.IsObjNotFoundErr(err) {
return cfg, alertspb.ErrNotFound
}

if uBucket.IsAccessDeniedErr(err) {
return cfg, alertspb.ErrAccessDenied
}

return cfg, err
}

Expand Down Expand Up @@ -142,10 +148,14 @@ func (s *BucketAlertStore) GetFullState(ctx context.Context, userID string) (ale
fs := alertspb.FullStateDesc{}

err := s.get(ctx, bkt, fullStateName, &fs)
if s.amBucket.IsObjNotFoundErr(err) {
if bkt.IsObjNotFoundErr(err) {
return fs, alertspb.ErrNotFound
}

if bkt.IsAccessDeniedErr(err) {
return fs, alertspb.ErrAccessDenied
}

return fs, err
}

Expand All @@ -172,10 +182,11 @@ func (s *BucketAlertStore) DeleteFullState(ctx context.Context, userID string) e
return err
}

func (s *BucketAlertStore) getAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, error) {
func (s *BucketAlertStore) getAlertConfig(ctx context.Context, userID string) (alertspb.AlertConfigDesc, objstore.Bucket, error) {
config := alertspb.AlertConfigDesc{}
err := s.get(ctx, s.getUserBucket(userID), userID, &config)
return config, err
userBkt := s.getUserBucket(userID)
err := s.get(ctx, userBkt, userID, &config)
return config, userBkt, err
}

func (s *BucketAlertStore) get(ctx context.Context, bkt objstore.Bucket, name string, msg proto.Message) error {
Expand Down Expand Up @@ -205,5 +216,6 @@ func (s *BucketAlertStore) getUserBucket(userID string) objstore.Bucket {
}

func (s *BucketAlertStore) getAlertmanagerUserBucket(userID string) objstore.Bucket {
return bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider).WithExpectedErrs(s.amBucket.IsObjNotFoundErr)
uBucket := bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider)
return uBucket.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
}
71 changes: 61 additions & 10 deletions pkg/alertmanager/alertstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package alertstore

import (
"context"
"fmt"
"io"
"testing"

"github.com/go-kit/log"
Expand All @@ -14,8 +16,12 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
)

var (
errAccessDenied = fmt.Errorf("access denied")
)

func TestAlertStore_ListAllUsers(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -40,7 +46,7 @@ func TestAlertStore_ListAllUsers(t *testing.T) {
}

func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand Down Expand Up @@ -78,7 +84,7 @@ func TestAlertStore_SetAndGetAlertConfig(t *testing.T) {
}

func TestStore_GetAlertConfigs(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -90,6 +96,15 @@ func TestStore_GetAlertConfigs(t *testing.T) {
assert.Empty(t, configs)
}

// Treat Access denied as empty.
{
m.err = errAccessDenied
configs, err := store.GetAlertConfigs(ctx, []string{"user-1", "user-2"})
require.NoError(t, err)
assert.Empty(t, configs)
m.err = nil
}

// The storage contains some configs.
{
require.NoError(t, store.SetAlertConfig(ctx, user1Cfg))
Expand All @@ -114,7 +129,7 @@ func TestStore_GetAlertConfigs(t *testing.T) {
}

func TestAlertStore_DeleteAlertConfig(t *testing.T) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, client interface{}) {
runForEachAlertStore(t, func(t *testing.T, store AlertStore, m *mockBucket, client interface{}) {
ctx := context.Background()
user1Cfg := alertspb.AlertConfigDesc{User: "user-1", RawConfig: "content-1"}
user2Cfg := alertspb.AlertConfigDesc{User: "user-2", RawConfig: "content-2"}
Expand All @@ -132,6 +147,12 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, user2Cfg, config)

// Ensure we capture access denied error
m.err = errAccessDenied
_, err = store.GetAlertConfig(ctx, "user-1")
assert.Equal(t, alertspb.ErrAccessDenied, err)
m.err = nil

// Delete the config for user-1.
require.NoError(t, store.DeleteAlertConfig(ctx, "user-1"))

Expand All @@ -148,26 +169,27 @@ func TestAlertStore_DeleteAlertConfig(t *testing.T) {
})
}

func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, client interface{})) {
func runForEachAlertStore(t *testing.T, testFn func(t *testing.T, store AlertStore, b *mockBucket, client interface{})) {
bucketClient := objstore.NewInMemBucket()
bucketStore := bucketclient.NewBucketAlertStore(bucketClient, nil, log.NewNopLogger())
mBucketClient := &mockBucket{Bucket: bucketClient}
bucketStore := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())

stores := map[string]struct {
store AlertStore
client interface{}
}{
"bucket": {store: bucketStore, client: bucketClient},
"bucket": {store: bucketStore, client: mBucketClient},
}

for name, data := range stores {
t.Run(name, func(t *testing.T) {
testFn(t, data.store, data.client)
testFn(t, data.store, mBucketClient, data.client)
})
}
}

func objectExists(bucketClient interface{}, key string) (bool, error) {
if typed, ok := bucketClient.(*objstore.InMemBucket); ok {
if typed, ok := bucketClient.(objstore.Bucket); ok {
return typed.Exists(context.Background(), key)
}

Expand All @@ -189,7 +211,8 @@ func makeTestFullState(content string) alertspb.FullStateDesc {

func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
bucket := objstore.NewInMemBucket()
store := bucketclient.NewBucketAlertStore(bucket, nil, log.NewNopLogger())
mBucketClient := &mockBucket{Bucket: bucket}
store := bucketclient.NewBucketAlertStore(mBucketClient, nil, log.NewNopLogger())
ctx := context.Background()

state1 := makeTestFullState("one")
Expand All @@ -208,6 +231,18 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
assert.ElementsMatch(t, []string{}, users)
}

// Test Access Denied
{
mBucketClient.err = errAccessDenied
_, err := store.GetFullState(ctx, "user-1")
assert.Equal(t, alertspb.ErrAccessDenied, err)

users, err := store.ListUsersWithFullState(ctx)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{}, users)
mBucketClient.err = nil
}

// The storage contains users.
{
require.NoError(t, store.SetFullState(ctx, "user-1", state1))
Expand Down Expand Up @@ -256,3 +291,19 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) {
require.NoError(t, store.DeleteFullState(ctx, "user-1"))
}
}

type mockBucket struct {
objstore.Bucket
err error
}

func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if m.err != nil {
return nil, m.err
}
return m.Bucket.Get(ctx, name)
}

func (m *mockBucket) IsAccessDeniedErr(err error) bool {
return err == errAccessDenied
}
9 changes: 6 additions & 3 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.

cfg, err := am.store.GetAlertConfig(r.Context(), userID)
if err != nil {
if err == alertspb.ErrNotFound {
switch {
case err == alertspb.ErrNotFound:
http.Error(w, err.Error(), http.StatusNotFound)
} else {
case err == alertspb.ErrAccessDenied:
http.Error(w, err.Error(), http.StatusForbidden)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
Expand Down Expand Up @@ -285,7 +288,7 @@ func (am *MultitenantAlertmanager) ListAllConfigs(w http.ResponseWriter, r *http

err = concurrency.ForEachUser(r.Context(), userIDs, fetchConcurrency, func(ctx context.Context, userID string) error {
cfg, err := am.store.GetAlertConfig(ctx, userID)
if errors.Is(err, alertspb.ErrNotFound) {
if errors.Is(err, alertspb.ErrNotFound) || errors.Is(err, alertspb.ErrAccessDenied) {
return nil
} else if err != nil {
return errors.Wrapf(err, "failed to fetch alertmanager config for user %s", userID)
Expand Down
6 changes: 6 additions & 0 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
syncFromReplica = "from-replica"
syncFromStorage = "from-storage"
syncUserNotFound = "user-not-found"
syncAccessDenied = "user-access-denied"
syncFailed = "failed"
)

Expand Down Expand Up @@ -232,6 +233,11 @@ func (s *state) starting(ctx context.Context) error {
s.initialSyncCompleted.WithLabelValues(syncUserNotFound).Inc()
return nil
}
if errors.Is(err, alertspb.ErrAccessDenied) {
level.Info(s.logger).Log("msg", "access deinied when trying to access user storage; proceeding", "user", s.userID)
s.initialSyncCompleted.WithLabelValues(syncAccessDenied).Inc()
return nil
}
if err == nil {
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
Expand Down