Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.stop, am.logger, am.registry)
state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.ReplicateStateFunc, cfg.GetPositionFunc, am.logger, am.registry)

if err := state.Service.StartAsync(context.Background()); err != nil {
return nil, fmt.Errorf("failed to start state replication service: %v", err)
}

am.state = state
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down Expand Up @@ -319,12 +325,21 @@ func (am *Alertmanager) Stop() {
am.dispatcher.Stop()
}

if service, ok := am.state.(*state); ok {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed like the least-worst option for the time being but open to suggestions, alternatives being:

  • Add Stop() method to State interface - Means we would need to a Stop method to Peer in Prometheus
  • Storing the service separately as stateService or something, again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more clean if you cast to the Service interface.

Suggested change
if service, ok := am.state.(*state); ok {
if service, ok := am.state.(services.Service); ok {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, the next step of refactoring I see (but I would do it in a separate PR) is to require am.state to implement the services.Service interface. We can wrap cfg.Peer into our own struct which implements services.Service (using services.NewIdleService()) and add services.Service to NilPeer too. This way we keep all implementations consistent.

Thoughts @stevesg @pstibrany ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. We should consider how to call Settle for the non-sharded case too, ideally that would be consistent (the Service wrapper for the Peer wrapper does the settling in starting, as for the replication).

The alternative to the road we've started on, is to have a single "PeerService" which wraps any type of Peer, though that would then need to be extended with some sort of "run" function, so you probably end back at making them all services...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you probably end back at making them all services...

I think having separate services would keep things cleaner at the end. Code duplication should be minimal.

service.StopAsync()
}

am.alerts.Close()
close(am.stop)
}

func (am *Alertmanager) StopAndWait() {
am.Stop()

if service, ok := am.state.(*state); ok {
_ = service.AwaitTerminated(context.Background())
}

am.wg.Wait()
}

Expand Down
20 changes: 11 additions & 9 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import (
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util/services"
)

// state represents the Alertmanager silences and notification log internal state.
type state struct {
services.Service

userID string
logger log.Logger
reg prometheus.Registerer
Expand All @@ -34,12 +38,11 @@ type state struct {
stateReplicationFailed *prometheus.CounterVec

msgc chan *clusterpb.Part
stopc chan struct{}
readyc chan struct{}
}

// newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers.
func newReplicatedStates(userID string, rf int, f func(context.Context, string, *clusterpb.Part) error, pf func(string) int, stopc chan struct{}, l log.Logger, r prometheus.Registerer) *state {
func newReplicatedStates(userID string, rf int, f func(context.Context, string, *clusterpb.Part) error, pf func(string) int, l log.Logger, r prometheus.Registerer) *state {

s := &state{
logger: l,
Expand All @@ -50,7 +53,6 @@ func newReplicatedStates(userID string, rf int, f func(context.Context, string,
states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences.
msgc: make(chan *clusterpb.Part),
readyc: make(chan struct{}),
stopc: stopc,
reg: r,
partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_partial_state_merges_total",
Expand All @@ -70,7 +72,8 @@ func newReplicatedStates(userID string, rf int, f func(context.Context, string,
}, []string{"key"}),
}

go s.handleBroadcasts()
s.Service = services.NewBasicService(nil, s.running, nil)

return s
}

Expand Down Expand Up @@ -142,23 +145,22 @@ func (s *state) Ready() bool {
return false
}

func (s *state) handleBroadcasts() {
func (s *state) running(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me leave some comments here, but not related to running(). Something to discuss about (but I would keep it separated from this PR):

  1. Why not doing the Settle() in the service starting?
  2. If we do There is no code and I am sad. #1, Ready() then would be waiting until service is running?

Thoughts @stevesg @pstibrany ?

Copy link
Contributor Author

@stevesg stevesg Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of implementing WaitReady in terms of the waiting for the service to be running, seems like the service starting/running model fits nicely onto settling/broadcasting. Also means we can remove the readyc channel, which is nice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. I was just going to ask why do we still need readyc channel, good that you suggest removing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this idea as well!

for {
select {
case p := <-s.msgc:
// If the replication factor is <= 1, we don't need to replicate any state anywhere else.
if s.replicationFactor <= 1 {
return
return nil
}

s.stateReplicationTotal.WithLabelValues(p.Key).Inc()
ctx := context.Background() //TODO: I probably need a better context
if err := s.replicateStateFunc(ctx, s.userID, p); err != nil {
s.stateReplicationFailed.WithLabelValues(p.Key).Inc()
level.Error(s.logger).Log("msg", "failed to replicate state to other alertmanagers", "user", s.userID, "key", p.Key, "err", err)
}
case <-s.stopc:
return
case <-ctx.Done():
return nil
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/state_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestStateReplication(t *testing.T) {
return 0
}

s := newReplicatedStates("user-1", tt.replicationFactor, replicationFunc, positionFunc, make(chan struct{}), log.NewNopLogger(), reg)
s := newReplicatedStates("user-1", tt.replicationFactor, replicationFunc, positionFunc, log.NewNopLogger(), reg)
require.NoError(t, s.Service.StartAsync(context.Background()))

ch := s.AddState("nflog", &fakeState{}, reg)

Expand Down