From fa0f9e5799934e1fc4743bbe7e1598b930077de3 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Fri, 26 Mar 2021 16:43:12 +0100 Subject: [PATCH 1/3] Read alertmanager state from storage if peer settling fails. Signed-off-by: Steve Simpson --- pkg/alertmanager/alertmanager.go | 4 +- pkg/alertmanager/multitenant.go | 1 + pkg/alertmanager/state_replication.go | 25 ++++++++++- pkg/alertmanager/state_replication_test.go | 50 ++++++++++++++++++++-- 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index e6880a89e56..6d605ce2458 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -43,6 +43,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -71,6 +72,7 @@ type Config struct { ShardingEnabled bool ReplicationFactor int Replicator Replicator + Store alertstore.AlertStore } // An Alertmanager manages the alerts for one user. @@ -161,7 +163,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { am.state = cfg.Peer } else if cfg.ShardingEnabled { level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication") - am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) + am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry) } else { level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication") am.state = &NilPeer{} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 57be1fe8c55..204500def53 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -855,6 +855,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco ShardingEnabled: am.cfg.ShardingEnabled, Replicator: am, ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor, + Store: am.store, }, reg) if err != nil { return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err) diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index 23dac007bfe..499f5c41ed4 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -15,11 +15,13 @@ import ( "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) const ( defaultSettleReadTimeout = 15 * time.Second + defaultStoreReadTimeout = 15 * time.Second ) // state represents the Alertmanager silences and notification log internal state. @@ -31,12 +33,14 @@ type state struct { reg prometheus.Registerer settleReadTimeout time.Duration + storeReadTimeout time.Duration mtx sync.Mutex states map[string]cluster.State replicationFactor int replicator Replicator + store alertstore.AlertStore partialStateMergesTotal *prometheus.CounterVec partialStateMergesFailed *prometheus.CounterVec @@ -47,17 +51,19 @@ type state struct { } // newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers. -func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r prometheus.Registerer) *state { +func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state { s := &state{ logger: l, userID: userID, replicationFactor: rf, replicator: re, + store: st, states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences. msgc: make(chan *clusterpb.Part), reg: r, settleReadTimeout: defaultSettleReadTimeout, + storeReadTimeout: defaultStoreReadTimeout, partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "alertmanager_partial_state_merges_total", Help: "Number of times we have received a partial state to merge for a key.", @@ -167,7 +173,22 @@ func (s *state) starting(ctx context.Context) error { } } - level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err) + level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err) + + // Attempt to read the state from persistent storage instead. + storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout) + defer cancel() + + fullState, err := s.store.GetFullState(storeReadCtx, s.userID) + if err == nil { + if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil { + level.Info(s.logger).Log("msg", "state read from storage; proceeding") + return nil + } + } + + level.Info(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err) + return nil } diff --git a/pkg/alertmanager/state_replication_test.go b/pkg/alertmanager/state_replication_test.go index 59c758dc906..fe7e58a9df0 100644 --- a/pkg/alertmanager/state_replication_test.go +++ b/pkg/alertmanager/state_replication_test.go @@ -18,6 +18,8 @@ import ( "github.com/go-kit/kit/log" + "github.com/cortexproject/cortex/pkg/alertmanager/alertspb" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -76,6 +78,25 @@ func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string return f.read.res, f.read.err } +type fakeAlertStore struct { + alertstore.AlertStore + + states map[string]alertspb.FullStateDesc +} + +func newFakeAlertStore() *fakeAlertStore { + return &fakeAlertStore{ + states: make(map[string]alertspb.FullStateDesc), + } +} + +func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) { + if result, ok := f.states[user]; ok { + return result, nil + } + return alertspb.FullStateDesc{}, alertspb.ErrNotFound +} + func TestStateReplication(t *testing.T) { tc := []struct { name string @@ -102,7 +123,8 @@ func TestStateReplication(t *testing.T) { reg := prometheus.NewPedanticRegistry() replicator := newFakeReplicator() replicator.read = readStateResult{res: nil, err: nil} - s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) + store := newFakeAlertStore() + s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg) require.False(t, s.Ready()) { @@ -163,6 +185,7 @@ func TestStateReplication_Settle(t *testing.T) { name string replicationFactor int read readStateResult + states map[string]alertspb.FullStateDesc results map[string][][]byte }{ { @@ -228,9 +251,26 @@ func TestStateReplication_Settle(t *testing.T) { }, }, { - name: "when reading the full state fails, still become ready.", + name: "when reading from replicas fails, state is read from storage.", + replicationFactor: 3, + read: readStateResult{err: errors.New("Read Error 1")}, + states: map[string]alertspb.FullStateDesc{ + "user-1": { + State: &clusterpb.FullState{ + Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}}, + }, + }, + }, + results: map[string][][]byte{ + "key1": {[]byte("Datum1")}, + "key2": nil, + }, + }, + { + name: "when reading from replicas and from storage fails, still become ready.", replicationFactor: 3, read: readStateResult{err: errors.New("Read Error 1")}, + states: map[string]alertspb.FullStateDesc{}, results: map[string][][]byte{ "key1": nil, "key2": nil, @@ -253,7 +293,9 @@ func TestStateReplication_Settle(t *testing.T) { replicator := newFakeReplicator() replicator.read = tt.read - s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) + store := newFakeAlertStore() + store.states = tt.states + s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg) key1State := &fakeState{} key2State := &fakeState{} @@ -322,7 +364,7 @@ func TestStateReplication_GetFullState(t *testing.T) { for _, tt := range tc { t.Run(tt.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg) + s := newReplicatedStates("user-1", 1, nil, nil, log.NewNopLogger(), reg) for key, datum := range tt.data { state := &fakeState{binary: datum} From d1cb5cca0b2ad9db091150b63c94c9acfebc3ee8 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Tue, 6 Apr 2021 10:54:14 +0200 Subject: [PATCH 2/3] Review comments. Signed-off-by: Steve Simpson --- pkg/alertmanager/state_replication.go | 7 ++++++- pkg/alertmanager/state_replication_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index 499f5c41ed4..c7e2162f062 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/alertmanager/alertspb" "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -180,6 +181,10 @@ func (s *state) starting(ctx context.Context) error { defer cancel() fullState, err := s.store.GetFullState(storeReadCtx, s.userID) + if err == alertspb.ErrNotFound { + level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID) + return nil + } if err == nil { if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil { level.Info(s.logger).Log("msg", "state read from storage; proceeding") @@ -187,7 +192,7 @@ func (s *state) starting(ctx context.Context) error { } } - level.Info(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err) + level.Warn(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err) return nil } diff --git a/pkg/alertmanager/state_replication_test.go b/pkg/alertmanager/state_replication_test.go index fe7e58a9df0..8e4bd90e7ec 100644 --- a/pkg/alertmanager/state_replication_test.go +++ b/pkg/alertmanager/state_replication_test.go @@ -185,7 +185,7 @@ func TestStateReplication_Settle(t *testing.T) { name string replicationFactor int read readStateResult - states map[string]alertspb.FullStateDesc + storeStates map[string]alertspb.FullStateDesc results map[string][][]byte }{ { @@ -254,7 +254,7 @@ func TestStateReplication_Settle(t *testing.T) { name: "when reading from replicas fails, state is read from storage.", replicationFactor: 3, read: readStateResult{err: errors.New("Read Error 1")}, - states: map[string]alertspb.FullStateDesc{ + storeStates: map[string]alertspb.FullStateDesc{ "user-1": { State: &clusterpb.FullState{ Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}}, @@ -270,7 +270,7 @@ func TestStateReplication_Settle(t *testing.T) { name: "when reading from replicas and from storage fails, still become ready.", replicationFactor: 3, read: readStateResult{err: errors.New("Read Error 1")}, - states: map[string]alertspb.FullStateDesc{}, + storeStates: map[string]alertspb.FullStateDesc{}, results: map[string][][]byte{ "key1": nil, "key2": nil, @@ -294,7 +294,7 @@ func TestStateReplication_Settle(t *testing.T) { replicator := newFakeReplicator() replicator.read = tt.read store := newFakeAlertStore() - store.states = tt.states + store.states = tt.storeStates s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg) key1State := &fakeState{} From 71f4c7edaae3b4f860d391bc3a9771d4ec1e3d83 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Tue, 6 Apr 2021 11:13:01 +0200 Subject: [PATCH 3/3] Review comments. Signed-off-by: Steve Simpson --- pkg/alertmanager/state_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index c7e2162f062..f749e151d39 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -181,7 +181,7 @@ func (s *state) starting(ctx context.Context) error { defer cancel() fullState, err := s.store.GetFullState(storeReadCtx, s.userID) - if err == alertspb.ErrNotFound { + if errors.Is(err, alertspb.ErrNotFound) { level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID) return nil }