diff --git a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go index ceeff937a7a..a8822d5e644 100644 --- a/pkg/alertmanager/alertstore/bucketclient/bucket_client.go +++ b/pkg/alertmanager/alertstore/bucketclient/bucket_client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io/ioutil" + "strings" "sync" "github.com/go-kit/kit/log" @@ -123,6 +124,18 @@ func (s *BucketAlertStore) DeleteAlertConfig(ctx context.Context, userID string) return err } +// ListUsersWithFullState implements alertstore.AlertStore. +func (s *BucketAlertStore) ListUsersWithFullState(ctx context.Context) ([]string, error) { + var userIDs []string + + err := s.amBucket.Iter(ctx, "", func(key string) error { + userIDs = append(userIDs, strings.TrimRight(key, "/")) + return nil + }) + + return userIDs, err +} + // GetFullState implements alertstore.AlertStore. func (s *BucketAlertStore) GetFullState(ctx context.Context, userID string) (alertspb.FullStateDesc, error) { bkt := s.getAlertmanagerUserBucket(userID) diff --git a/pkg/alertmanager/alertstore/configdb/store.go b/pkg/alertmanager/alertstore/configdb/store.go index b86d9c7c74a..880af40c072 100644 --- a/pkg/alertmanager/alertstore/configdb/store.go +++ b/pkg/alertmanager/alertstore/configdb/store.go @@ -93,6 +93,11 @@ func (c *Store) DeleteAlertConfig(ctx context.Context, user string) error { return errReadOnly } +// ListUsersWithFullState implements alertstore.AlertStore. +func (c *Store) ListUsersWithFullState(ctx context.Context) ([]string, error) { + return nil, errState +} + // GetFullState implements alertstore.AlertStore. func (c *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) { return alertspb.FullStateDesc{}, errState diff --git a/pkg/alertmanager/alertstore/local/store.go b/pkg/alertmanager/alertstore/local/store.go index 08492e074eb..53ffa7db3e6 100644 --- a/pkg/alertmanager/alertstore/local/store.go +++ b/pkg/alertmanager/alertstore/local/store.go @@ -101,6 +101,11 @@ func (f *Store) DeleteAlertConfig(_ context.Context, user string) error { return errReadOnly } +// ListUsersWithFullState implements alertstore.AlertStore. +func (f *Store) ListUsersWithFullState(ctx context.Context) ([]string, error) { + return nil, errState +} + // GetFullState implements alertstore.AlertStore. func (f *Store) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) { return alertspb.FullStateDesc{}, errState diff --git a/pkg/alertmanager/alertstore/objectclient/store.go b/pkg/alertmanager/alertstore/objectclient/store.go index 132e86edb3a..78128069374 100644 --- a/pkg/alertmanager/alertstore/objectclient/store.go +++ b/pkg/alertmanager/alertstore/objectclient/store.go @@ -142,6 +142,11 @@ func (a *AlertStore) DeleteAlertConfig(ctx context.Context, user string) error { return err } +// ListUsersWithFullState implements alertstore.AlertStore. +func (a *AlertStore) ListUsersWithFullState(ctx context.Context) ([]string, error) { + return nil, errState +} + // GetFullState implements alertstore.AlertStore. func (a *AlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) { return alertspb.FullStateDesc{}, errState diff --git a/pkg/alertmanager/alertstore/store.go b/pkg/alertmanager/alertstore/store.go index 875a9741c8c..12bba961142 100644 --- a/pkg/alertmanager/alertstore/store.go +++ b/pkg/alertmanager/alertstore/store.go @@ -40,6 +40,9 @@ type AlertStore interface { // If configuration for the user doesn't exist, no error is reported. DeleteAlertConfig(ctx context.Context, user string) error + // ListUsersWithFullState returns the list of users which have had state written. + ListUsersWithFullState(ctx context.Context) ([]string, error) + // GetFullState loads and returns the alertmanager state for the given user. GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) diff --git a/pkg/alertmanager/alertstore/store_test.go b/pkg/alertmanager/alertstore/store_test.go index ea7b8722af4..2c87005a919 100644 --- a/pkg/alertmanager/alertstore/store_test.go +++ b/pkg/alertmanager/alertstore/store_test.go @@ -220,6 +220,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) { _, err = store.GetFullState(ctx, "user-2") assert.Equal(t, alertspb.ErrNotFound, err) + + users, err := store.ListUsersWithFullState(ctx) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{}, users) } // The storage contains users. @@ -244,6 +248,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) { exists, err = bucket.Exists(ctx, "alertmanager/user-2/fullstate") require.NoError(t, err) assert.True(t, exists) + + users, err := store.ListUsersWithFullState(ctx) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"user-1", "user-2"}, users) } // The storage has had user-1 deleted. @@ -258,6 +266,10 @@ func TestBucketAlertStore_GetSetDeleteFullState(t *testing.T) { require.NoError(t, err) assert.Equal(t, state2, res) + users, err := store.ListUsersWithFullState(ctx) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"user-2"}, users) + // Delete again (should be idempotent). require.NoError(t, store.DeleteFullState(ctx, "user-1")) } diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index d9dc4d6f156..48b1f8c9fbd 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -663,7 +663,7 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR level.Info(am.logger).Log("msg", "synchronizing alertmanager configs for users") am.syncTotal.WithLabelValues(syncReason).Inc() - cfgs, err := am.loadAlertmanagerConfigs(ctx) + allUsers, cfgs, err := am.loadAlertmanagerConfigs(ctx) if err != nil { am.syncFailures.WithLabelValues(syncReason).Inc() return err @@ -672,6 +672,13 @@ func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncR am.syncConfigs(cfgs) am.deleteUnusedLocalUserState() + // Currently, remote state persistence is only used when sharding is enabled. + if am.cfg.ShardingEnabled { + // Note when cleaning up remote state, remember that the user may not necessarily be configured + // in this instance. Therefore, pass the list of _all_ configured users to filter by. + am.deleteUnusedRemoteUserState(ctx, allUsers) + } + return nil } @@ -696,35 +703,35 @@ func (am *MultitenantAlertmanager) stopping(_ error) error { return nil } -// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy. -func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) { +// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy. Returns: +// - The list of discovered users (all users with a configuration in storage) +// - The configurations of users owned by this instance. +func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) ([]string, map[string]alertspb.AlertConfigDesc, error) { // Find all users with an alertmanager config. - userIDs, err := am.store.ListAllUsers(ctx) + allUserIDs, err := am.store.ListAllUsers(ctx) if err != nil { - return nil, errors.Wrap(err, "failed to list users with alertmanager configuration") + return nil, nil, errors.Wrap(err, "failed to list users with alertmanager configuration") } - numUsersDiscovered := len(userIDs) + numUsersDiscovered := len(allUserIDs) + ownedUserIDs := make([]string, 0, len(allUserIDs)) // Filter out users not owned by this shard. - for i := 0; i < len(userIDs); { - if !am.isUserOwned(userIDs[i]) { - userIDs = append(userIDs[:i], userIDs[i+1:]...) - continue + for _, userID := range allUserIDs { + if am.isUserOwned(userID) { + ownedUserIDs = append(ownedUserIDs, userID) } - - i++ } - numUsersOwned := len(userIDs) + numUsersOwned := len(ownedUserIDs) // Load the configs for the owned users. - configs, err := am.store.GetAlertConfigs(ctx, userIDs) + configs, err := am.store.GetAlertConfigs(ctx, ownedUserIDs) if err != nil { - return nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users") + return nil, nil, errors.Wrapf(err, "failed to load alertmanager configurations for owned users") } am.tenantsDiscovered.Set(float64(numUsersDiscovered)) am.tenantsOwned.Set(float64(numUsersOwned)) - return configs, nil + return allUserIDs, configs, nil } func (am *MultitenantAlertmanager) isUserOwned(userID string) bool { @@ -1147,6 +1154,34 @@ func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *cluste return &alertmanagerpb.UpdateStateResponse{Status: alertmanagerpb.OK}, nil } +// deleteUnusedRemoteUserState deletes state objects in remote storage for users that are no longer configured. +func (am *MultitenantAlertmanager) deleteUnusedRemoteUserState(ctx context.Context, allUsers []string) { + + users := make(map[string]struct{}, len(allUsers)) + for _, userID := range allUsers { + users[userID] = struct{}{} + } + + usersWithState, err := am.store.ListUsersWithFullState(ctx) + if err != nil { + level.Warn(am.logger).Log("msg", "failed to list users with state", "err", err) + return + } + + for _, userID := range usersWithState { + if _, ok := users[userID]; ok { + continue + } + + err := am.store.DeleteFullState(ctx, userID) + if err != nil { + level.Warn(am.logger).Log("msg", "failed to delete remote state for user", "user", userID, "err", err) + } else { + level.Info(am.logger).Log("msg", "deleted remote state for user", "user", userID) + } + } +} + // deleteUnusedLocalUserState deletes local files for users that we no longer need. func (am *MultitenantAlertmanager) deleteUnusedLocalUserState() { userDirs := am.getPerUserDirectories() diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index c66eac55c14..27171be3e9a 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -571,6 +571,99 @@ func TestMultitenantAlertmanager_deleteUnusedLocalUserState(t *testing.T) { require.NotZero(t, dirs[user2]) // has config, files survived } +func TestMultitenantAlertmanager_deleteUnusedRemoteUserState(t *testing.T) { + ctx := context.Background() + + const ( + user1 = "user1" + user2 = "user2" + ) + + alertStore := prepareInMemoryAlertStore() + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + + createInstance := func(i int) *MultitenantAlertmanager { + reg := prometheus.NewPedanticRegistry() + cfg := mockAlertmanagerConfig(t) + + cfg.ShardingRing.ReplicationFactor = 1 + cfg.ShardingRing.InstanceID = fmt.Sprintf("instance-%d", i) + cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.1-%d", i) + cfg.ShardingEnabled = true + + // Increase state write interval so that state gets written sooner, making test faster. + cfg.Persister.Interval = 500 * time.Millisecond + + am, err := createMultitenantAlertmanager(cfg, nil, nil, alertStore, ringStore, nil, log.NewLogfmtLogger(os.Stdout), reg) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, am)) + }) + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + + return am + } + + // Create two instances. With replication factor of 1, this means that only one + // of the instances will own the user. This tests that an instance does not delete + // state for users that are configured, but are owned by other instances. + am1 := createInstance(1) + am2 := createInstance(2) + + // Configure the users and wait for the state persister to write some state for both. + { + require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{ + User: user1, + RawConfig: simpleConfigOne, + Templates: []*alertspb.TemplateDesc{}, + })) + require.NoError(t, alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{ + User: user2, + RawConfig: simpleConfigOne, + Templates: []*alertspb.TemplateDesc{}, + })) + + err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + + require.Eventually(t, func() bool { + _, err1 := alertStore.GetFullState(context.Background(), user1) + _, err2 := alertStore.GetFullState(context.Background(), user2) + return err1 == nil && err2 == nil + }, 5*time.Second, 100*time.Millisecond, "timed out waiting for state to be persisted") + } + + // Perform another sync to trigger cleanup; this should have no effect. + { + err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + + _, err = alertStore.GetFullState(context.Background(), user1) + require.NoError(t, err) + _, err = alertStore.GetFullState(context.Background(), user2) + require.NoError(t, err) + } + + // Delete one configuration and trigger cleanup; state for only that user should be deleted. + { + require.NoError(t, alertStore.DeleteAlertConfig(ctx, user1)) + + err := am1.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + err = am2.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + + _, err = alertStore.GetFullState(context.Background(), user1) + require.Equal(t, alertspb.ErrNotFound, err) + _, err = alertStore.GetFullState(context.Background(), user2) + require.NoError(t, err) + } +} + func createFile(t *testing.T, path string) string { dir := filepath.Dir(path) require.NoError(t, os.MkdirAll(dir, 0777)) @@ -834,6 +927,7 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) { require.True(t, am.ringLifecycler.IsRegistered()) require.Equal(t, ring.JOINING.String(), am.ringLifecycler.GetState().String()) }) + bkt.MockIter("alertmanager/", nil, nil) // Once successfully started, the instance should be ACTIVE in the ring. require.NoError(t, services.StartAndAwaitRunning(ctx, am)) @@ -1184,6 +1278,7 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) { // Mock the store to fail listing configs. bkt := &bucket.ClientMock{} bkt.MockIter("alerts/", nil, errors.New("failed to list alerts")) + bkt.MockIter("alertmanager/", nil, nil) store := bucketclient.NewBucketAlertStore(bkt, nil, log.NewNopLogger()) am, err := createMultitenantAlertmanager(amConfig, nil, nil, store, ringStore, nil, log.NewNopLogger(), nil)