diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index b0eaf0b81a5..e352b89b77f 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/alertmanager/api" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster/clusterpb" @@ -41,6 +42,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + + "github.com/cortexproject/cortex/pkg/util/services" ) const notificationLogMaintenancePeriod = 15 * time.Minute @@ -137,7 +140,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { am.state = cfg.Peer } else if cfg.ShardingEnabled { level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication") - am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.stop, am.logger, am.registry) + state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.logger, am.registry) + + if err := state.Service.StartAsync(context.Background()); err != nil { + return nil, errors.Wrap(err, "failed to start ring-based replication service") + } + + am.state = state } else { level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication") am.state = &NilPeer{} @@ -319,12 +328,23 @@ func (am *Alertmanager) Stop() { am.dispatcher.Stop() } + if service, ok := am.state.(services.Service); ok { + service.StopAsync() + } + am.alerts.Close() close(am.stop) } func (am *Alertmanager) StopAndWait() { am.Stop() + + if service, ok := am.state.(services.Service); ok { + if err := service.AwaitTerminated(context.Background()); err != nil { + level.Warn(am.logger).Log("msg", "error while stopping ring-based replication service", "err", err) + } + } + am.wg.Wait() } diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index 9bb98ded335..d5f3e4f60b5 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -13,10 +13,14 @@ import ( "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/util/services" ) // state represents the Alertmanager silences and notification log internal state. type state struct { + services.Service + userID string logger log.Logger reg prometheus.Registerer @@ -34,12 +38,11 @@ type state struct { stateReplicationFailed *prometheus.CounterVec msgc chan *clusterpb.Part - stopc chan struct{} readyc chan struct{} } // newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers. -func newReplicatedStates(userID string, rf int, f func(context.Context, string, *clusterpb.Part) error, pf func(string) int, stopc chan struct{}, l log.Logger, r prometheus.Registerer) *state { +func newReplicatedStates(userID string, rf int, f func(context.Context, string, *clusterpb.Part) error, pf func(string) int, l log.Logger, r prometheus.Registerer) *state { s := &state{ logger: l, @@ -50,7 +53,6 @@ func newReplicatedStates(userID string, rf int, f func(context.Context, string, states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences. msgc: make(chan *clusterpb.Part), readyc: make(chan struct{}), - stopc: stopc, reg: r, partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "alertmanager_partial_state_merges_total", @@ -70,7 +72,8 @@ func newReplicatedStates(userID string, rf int, f func(context.Context, string, }, []string{"key"}), } - go s.handleBroadcasts() + s.Service = services.NewBasicService(nil, s.running, nil) + return s } @@ -142,23 +145,22 @@ func (s *state) Ready() bool { return false } -func (s *state) handleBroadcasts() { +func (s *state) running(ctx context.Context) error { for { select { case p := <-s.msgc: // If the replication factor is <= 1, we don't need to replicate any state anywhere else. if s.replicationFactor <= 1 { - return + return nil } s.stateReplicationTotal.WithLabelValues(p.Key).Inc() - ctx := context.Background() //TODO: I probably need a better context if err := s.replicateStateFunc(ctx, s.userID, p); err != nil { s.stateReplicationFailed.WithLabelValues(p.Key).Inc() level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "user", s.userID, "key", p.Key, "err", err) } - case <-s.stopc: - return + case <-ctx.Done(): + return nil } } } diff --git a/pkg/alertmanager/state_replication_test.go b/pkg/alertmanager/state_replication_test.go index 431be525749..09713ff6f1c 100644 --- a/pkg/alertmanager/state_replication_test.go +++ b/pkg/alertmanager/state_replication_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/require" "github.com/go-kit/kit/log" + + "github.com/cortexproject/cortex/pkg/util/services" ) type fakeState struct{} @@ -64,7 +66,12 @@ func TestStateReplication(t *testing.T) { return 0 } - s := newReplicatedStates("user-1", tt.replicationFactor, replicationFunc, positionFunc, make(chan struct{}), log.NewNopLogger(), reg) + s := newReplicatedStates("user-1", tt.replicationFactor, replicationFunc, positionFunc, log.NewNopLogger(), reg) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), s)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), s)) + }) ch := s.AddState("nflog", &fakeState{}, reg)