diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index 1867ee2c905..7f963d23362 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -233,9 +233,6 @@ func TestAlertmanagerClustering(t *testing.T) { } func TestAlertmanagerSharding(t *testing.T) { - // TODO See: https://github.com/cortexproject/cortex/issues/3927 - t.Skip("this test is skipped because of a bug in the alertmanager sharding logic, which is currently under development") - tests := map[string]struct { legacyAlertStore bool }{ diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index a370bfe46fd..e6880a89e56 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -129,6 +129,8 @@ type Replicator interface { // The alertmanager replication protocol relies on a position related to other replicas. // This position is then used to identify who should notify about the alert first. GetPositionForUser(userID string) int + // ReadFullStateForUser obtains the full state from other replicas in the cluster. + ReadFullStateForUser(context.Context, string) ([]*clusterpb.FullState, error) } // New creates a new Alertmanager. @@ -159,13 +161,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { am.state = cfg.Peer } else if cfg.ShardingEnabled { level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication") - state := newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) - - if err := state.Service.StartAsync(context.Background()); err != nil { - return nil, errors.Wrap(err, "failed to start ring-based replication service") - } - - am.state = state + am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) } else { level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication") am.state = &NilPeer{} @@ -203,6 +199,13 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { c = am.state.AddState("sil:"+cfg.UserID, am.silences, am.registry) am.silences.SetBroadcast(c.Broadcast) + // State replication needs to be started after the state keys are defined. + if service, ok := am.state.(services.Service); ok { + if err := service.StartAsync(context.Background()); err != nil { + return nil, errors.Wrap(err, "failed to start ring-based replication service") + } + } + am.pipelineBuilder = notify.NewPipelineBuilder(am.registry) am.wg.Add(1) @@ -373,6 +376,13 @@ func (am *Alertmanager) mergePartialExternalState(part *clusterpb.Part) error { return errors.New("ring-based sharding not enabled") } +func (am *Alertmanager) getFullState() (*clusterpb.FullState, error) { + if state, ok := am.state.(*state); ok { + return state.GetFullState() + } + return nil, errors.New("ring-based sharding not enabled") +} + // buildIntegrationsMap builds a map of name to the list of integration notifiers off of a // list of receiver config. func buildIntegrationsMap(nc []*config.Receiver, tmpl *template.Template, logger log.Logger) (map[string][]notify.Integration, error) { diff --git a/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go b/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go index f7a400918b2..ecb21512fec 100644 --- a/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go +++ b/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go @@ -6,6 +6,7 @@ package alertmanagerpb import ( context "context" fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" clusterpb "github.com/prometheus/alertmanager/cluster/clusterpb" httpgrpc "github.com/weaveworks/common/httpgrpc" @@ -55,6 +56,33 @@ func (UpdateStateStatus) EnumDescriptor() ([]byte, []int) { return fileDescriptor_e60437b6e0c74c9a, []int{0} } +type ReadStateStatus int32 + +const ( + READ_UNSPECIFIED ReadStateStatus = 0 + READ_OK ReadStateStatus = 1 + READ_ERROR ReadStateStatus = 2 + READ_USER_NOT_FOUND ReadStateStatus = 3 +) + +var ReadStateStatus_name = map[int32]string{ + 0: "READ_UNSPECIFIED", + 1: "READ_OK", + 2: "READ_ERROR", + 3: "READ_USER_NOT_FOUND", +} + +var ReadStateStatus_value = map[string]int32{ + "READ_UNSPECIFIED": 0, + "READ_OK": 1, + "READ_ERROR": 2, + "READ_USER_NOT_FOUND": 3, +} + +func (ReadStateStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_e60437b6e0c74c9a, []int{1} +} + type UpdateStateResponse struct { Status UpdateStateStatus `protobuf:"varint,1,opt,name=status,proto3,enum=alertmanagerpb.UpdateStateStatus" json:"status,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` @@ -106,39 +134,144 @@ func (m *UpdateStateResponse) GetError() string { return "" } +type ReadStateRequest struct { +} + +func (m *ReadStateRequest) Reset() { *m = ReadStateRequest{} } +func (*ReadStateRequest) ProtoMessage() {} +func (*ReadStateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_e60437b6e0c74c9a, []int{1} +} +func (m *ReadStateRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadStateRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReadStateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadStateRequest.Merge(m, src) +} +func (m *ReadStateRequest) XXX_Size() int { + return m.Size() +} +func (m *ReadStateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadStateRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadStateRequest proto.InternalMessageInfo + +type ReadStateResponse struct { + Status ReadStateStatus `protobuf:"varint,1,opt,name=status,proto3,enum=alertmanagerpb.ReadStateStatus" json:"status,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + State *clusterpb.FullState `protobuf:"bytes,3,opt,name=state,proto3" json:"state,omitempty"` +} + +func (m *ReadStateResponse) Reset() { *m = ReadStateResponse{} } +func (*ReadStateResponse) ProtoMessage() {} +func (*ReadStateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_e60437b6e0c74c9a, []int{2} +} +func (m *ReadStateResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadStateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadStateResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReadStateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadStateResponse.Merge(m, src) +} +func (m *ReadStateResponse) XXX_Size() int { + return m.Size() +} +func (m *ReadStateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReadStateResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadStateResponse proto.InternalMessageInfo + +func (m *ReadStateResponse) GetStatus() ReadStateStatus { + if m != nil { + return m.Status + } + return READ_UNSPECIFIED +} + +func (m *ReadStateResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *ReadStateResponse) GetState() *clusterpb.FullState { + if m != nil { + return m.State + } + return nil +} + func init() { proto.RegisterEnum("alertmanagerpb.UpdateStateStatus", UpdateStateStatus_name, UpdateStateStatus_value) + proto.RegisterEnum("alertmanagerpb.ReadStateStatus", ReadStateStatus_name, ReadStateStatus_value) proto.RegisterType((*UpdateStateResponse)(nil), "alertmanagerpb.UpdateStateResponse") + proto.RegisterType((*ReadStateRequest)(nil), "alertmanagerpb.ReadStateRequest") + proto.RegisterType((*ReadStateResponse)(nil), "alertmanagerpb.ReadStateResponse") } func init() { proto.RegisterFile("alertmanager.proto", fileDescriptor_e60437b6e0c74c9a) } var fileDescriptor_e60437b6e0c74c9a = []byte{ - // 377 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0x3f, 0x6f, 0x1a, 0x31, - 0x18, 0xc6, 0x6d, 0xaa, 0x22, 0xd5, 0xb4, 0x40, 0xdd, 0x3f, 0x42, 0x37, 0x58, 0x94, 0x2e, 0xa8, - 0xc3, 0x9d, 0x44, 0xbb, 0x74, 0xa3, 0x88, 0x6b, 0x91, 0xaa, 0x72, 0xc8, 0xc0, 0xd2, 0x05, 0xf9, - 0x0e, 0x07, 0xa2, 0x70, 0x67, 0xc7, 0xf6, 0x85, 0x35, 0x1f, 0x21, 0x5b, 0xbe, 0x42, 0x3e, 0x4a, - 0x46, 0x46, 0xc6, 0x70, 0x2c, 0x19, 0xf9, 0x08, 0x91, 0x38, 0x02, 0x17, 0x22, 0x65, 0xf2, 0xeb, - 0x47, 0x7e, 0x7e, 0x7a, 0xde, 0xd7, 0x2f, 0xc2, 0x6c, 0xc6, 0x95, 0x09, 0x59, 0xc4, 0x26, 0x5c, - 0xd9, 0x52, 0x09, 0x23, 0x70, 0x31, 0xab, 0x49, 0xdf, 0xfa, 0x31, 0x39, 0x35, 0xd3, 0xd8, 0xb7, - 0x03, 0x11, 0x3a, 0x73, 0xce, 0x2e, 0xf8, 0x5c, 0xa8, 0x33, 0xed, 0x04, 0x22, 0x0c, 0x45, 0xe4, - 0x4c, 0x8d, 0x91, 0x13, 0x25, 0x83, 0x7d, 0x91, 0x52, 0xac, 0x56, 0xc6, 0x25, 0x95, 0x08, 0xb9, - 0x99, 0xf2, 0x58, 0x3b, 0x59, 0xb6, 0x13, 0xcc, 0x62, 0x6d, 0x0e, 0xa7, 0xf4, 0x1f, 0xab, 0x94, - 0x51, 0x3b, 0x41, 0x1f, 0x86, 0x72, 0xcc, 0x0c, 0xef, 0x1b, 0x66, 0x38, 0xe5, 0x5a, 0x8a, 0x48, - 0x73, 0xfc, 0x13, 0xe5, 0xb5, 0x61, 0x26, 0xd6, 0x15, 0x58, 0x85, 0xf5, 0x62, 0xe3, 0x8b, 0xfd, - 0x34, 0xb1, 0x9d, 0x31, 0xf5, 0xb7, 0x0f, 0xe9, 0xce, 0x80, 0x3f, 0xa2, 0xd7, 0x5c, 0x29, 0xa1, - 0x2a, 0xb9, 0x2a, 0xac, 0xbf, 0xa1, 0xe9, 0xe5, 0x5b, 0x13, 0xbd, 0x7f, 0x66, 0xc1, 0x79, 0x94, - 0xf3, 0xfe, 0x96, 0x01, 0x2e, 0xa1, 0xc2, 0x3f, 0x97, 0xfe, 0x71, 0x47, 0x2e, 0xa5, 0x1e, 0x2d, - 0xe7, 0x30, 0x46, 0xc5, 0x61, 0xdf, 0xa5, 0xa3, 0xae, 0x37, 0x18, 0xfd, 0xf6, 0x86, 0xdd, 0x76, - 0xf9, 0x55, 0xe3, 0x1a, 0xa2, 0xb7, 0xbf, 0x32, 0x21, 0x70, 0x13, 0xbd, 0xeb, 0xb0, 0x68, 0x3c, - 0xe3, 0x94, 0x9f, 0xc7, 0x5c, 0x1b, 0xfc, 0xc9, 0xde, 0x0f, 0xa8, 0x33, 0x18, 0xf4, 0x76, 0xb2, - 0xf5, 0xf9, 0x58, 0x4e, 0x7b, 0xac, 0x01, 0xec, 0xa2, 0x42, 0x26, 0x14, 0x2e, 0xd9, 0xfb, 0x29, - 0xd9, 0x3d, 0xa6, 0x8c, 0xf5, 0xf5, 0x85, 0xae, 0x0f, 0x98, 0x56, 0x7b, 0xb1, 0x22, 0x60, 0xb9, - 0x22, 0x60, 0xb3, 0x22, 0xf0, 0x32, 0x21, 0xf0, 0x26, 0x21, 0xf0, 0x36, 0x21, 0x70, 0x91, 0x10, - 0x78, 0x97, 0x10, 0x78, 0x9f, 0x10, 0xb0, 0x49, 0x08, 0xbc, 0x5a, 0x13, 0xb0, 0x58, 0x13, 0xb0, - 0x5c, 0x13, 0xf0, 0xff, 0x68, 0x07, 0xfc, 0xfc, 0xf6, 0x43, 0xbe, 0x3f, 0x04, 0x00, 0x00, 0xff, - 0xff, 0xcc, 0x96, 0xf1, 0xad, 0x30, 0x02, 0x00, 0x00, + // 509 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x41, 0x6f, 0x12, 0x41, + 0x18, 0x9d, 0xa1, 0x16, 0xd3, 0x0f, 0x85, 0xed, 0x14, 0x95, 0x70, 0x98, 0x52, 0xbc, 0x10, 0x0e, + 0xbb, 0x09, 0x9a, 0x18, 0x3d, 0xb5, 0x95, 0xc5, 0x36, 0x8d, 0x40, 0x06, 0xb8, 0x98, 0x18, 0x32, + 0xc0, 0x08, 0x46, 0x60, 0xd6, 0xd9, 0x59, 0x7b, 0xf5, 0x27, 0x78, 0xf0, 0x07, 0x78, 0xf4, 0xa7, + 0x78, 0xe4, 0xd8, 0xa3, 0x2c, 0x97, 0x26, 0x5e, 0xfa, 0x13, 0x4c, 0x59, 0x76, 0x5d, 0xd7, 0xd8, + 0xf4, 0xb4, 0xdf, 0xbc, 0xf9, 0xde, 0x7b, 0xf3, 0xbd, 0x99, 0x05, 0xc2, 0xa7, 0x42, 0xe9, 0x19, + 0x9f, 0xf3, 0xb1, 0x50, 0xa6, 0xa3, 0xa4, 0x96, 0x24, 0x1b, 0xc7, 0x9c, 0x41, 0x31, 0x3f, 0x96, + 0x63, 0xb9, 0xde, 0xb2, 0xae, 0xab, 0xa0, 0xab, 0xf8, 0x74, 0xfc, 0x5e, 0x4f, 0xbc, 0x81, 0x39, + 0x94, 0x33, 0xeb, 0x5c, 0xf0, 0x4f, 0xe2, 0x5c, 0xaa, 0x0f, 0xae, 0x35, 0x94, 0xb3, 0x99, 0x9c, + 0x5b, 0x13, 0xad, 0x9d, 0xb1, 0x72, 0x86, 0x51, 0xb1, 0x61, 0x1d, 0xc7, 0x58, 0x8e, 0x92, 0x33, + 0xa1, 0x27, 0xc2, 0x73, 0xad, 0xb8, 0xa3, 0x35, 0x9c, 0x7a, 0xae, 0xfe, 0xf3, 0x75, 0x06, 0x61, + 0x15, 0x68, 0x94, 0xdf, 0xc1, 0x5e, 0xcf, 0x19, 0x71, 0x2d, 0x3a, 0x9a, 0x6b, 0xc1, 0x84, 0xeb, + 0xc8, 0xb9, 0x2b, 0xc8, 0x73, 0x48, 0xbb, 0x9a, 0x6b, 0xcf, 0x2d, 0xe0, 0x12, 0xae, 0x64, 0x6b, + 0x07, 0xe6, 0xdf, 0x73, 0x98, 0x31, 0x52, 0x67, 0xdd, 0xc8, 0x36, 0x04, 0x92, 0x87, 0x6d, 0xa1, + 0x94, 0x54, 0x85, 0x54, 0x09, 0x57, 0x76, 0x58, 0xb0, 0x28, 0x13, 0x30, 0x98, 0xe0, 0xa3, 0x8d, + 0xcb, 0x47, 0x4f, 0xb8, 0xba, 0xfc, 0x15, 0xc3, 0x6e, 0x0c, 0xdc, 0x58, 0x3f, 0x4b, 0x58, 0xef, + 0x27, 0xad, 0x23, 0xca, 0x6d, 0x8c, 0x49, 0x15, 0xb6, 0xaf, 0xf7, 0x45, 0x61, 0xab, 0x84, 0x2b, + 0x99, 0x5a, 0xde, 0x8c, 0x92, 0x30, 0x1b, 0xde, 0x74, 0x1a, 0x78, 0x07, 0x2d, 0x2f, 0xee, 0x5c, + 0x7e, 0xdb, 0x47, 0xd5, 0x43, 0xd8, 0xfd, 0x67, 0x3a, 0x92, 0x86, 0x54, 0xeb, 0xcc, 0x40, 0x24, + 0x07, 0x99, 0xd7, 0x36, 0x7b, 0x65, 0xf7, 0x6d, 0xc6, 0x5a, 0xcc, 0x48, 0x11, 0x02, 0xd9, 0x5e, + 0xc7, 0x66, 0xfd, 0x66, 0xab, 0xdb, 0x6f, 0xb4, 0x7a, 0xcd, 0xba, 0xb1, 0x55, 0x7d, 0x0b, 0xb9, + 0xc4, 0x21, 0x49, 0x1e, 0x0c, 0x66, 0x1f, 0xd5, 0xfb, 0xbd, 0x66, 0xa7, 0x6d, 0xbf, 0x3c, 0x6d, + 0x9c, 0xda, 0x75, 0x03, 0x91, 0x0c, 0xdc, 0x5d, 0xa3, 0xad, 0x33, 0x03, 0x93, 0x2c, 0xc0, 0x7a, + 0x11, 0x2a, 0x3f, 0x82, 0xbd, 0x80, 0x92, 0x90, 0xaf, 0xfd, 0xc2, 0x70, 0xef, 0x28, 0x96, 0x09, + 0x39, 0x84, 0xfb, 0x27, 0x7c, 0x3e, 0x9a, 0x86, 0xc9, 0x92, 0x07, 0x66, 0xf4, 0x54, 0x4e, 0xba, + 0xdd, 0xf6, 0x06, 0x2e, 0x3e, 0x4c, 0xc2, 0x41, 0xe4, 0x65, 0x44, 0x6c, 0xc8, 0xc4, 0x66, 0x26, + 0xb9, 0x58, 0x4a, 0x6d, 0xae, 0x74, 0xf1, 0xf1, 0x0d, 0xf7, 0x1f, 0x93, 0x61, 0xb0, 0x13, 0x0d, + 0x4e, 0x4a, 0xff, 0xbd, 0xb8, 0xf0, 0x3c, 0x07, 0x37, 0x74, 0x84, 0x9a, 0xc7, 0xf5, 0xc5, 0x92, + 0xa2, 0x8b, 0x25, 0x45, 0x57, 0x4b, 0x8a, 0x3f, 0xfb, 0x14, 0x7f, 0xf7, 0x29, 0xfe, 0xe1, 0x53, + 0xbc, 0xf0, 0x29, 0xfe, 0xe9, 0x53, 0x7c, 0xe9, 0x53, 0x74, 0xe5, 0x53, 0xfc, 0x65, 0x45, 0xd1, + 0x62, 0x45, 0xd1, 0xc5, 0x8a, 0xa2, 0x37, 0x89, 0xff, 0x6e, 0x90, 0x5e, 0x3f, 0xf7, 0x27, 0xbf, + 0x03, 0x00, 0x00, 0xff, 0xff, 0x81, 0x5b, 0x6b, 0x33, 0xa4, 0x03, 0x00, 0x00, } func (x UpdateStateStatus) String() string { @@ -148,6 +281,13 @@ func (x UpdateStateStatus) String() string { } return strconv.Itoa(int(x)) } +func (x ReadStateStatus) String() string { + s, ok := ReadStateStatus_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} func (this *UpdateStateResponse) Equal(that interface{}) bool { if that == nil { return this == nil @@ -175,6 +315,27 @@ func (this *UpdateStateResponse) Equal(that interface{}) bool { } return true } +func (this *ReadStateRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ReadStateRequest) + if !ok { + that2, ok := that.(ReadStateRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} func (this *UpdateStateResponse) GoString() string { if this == nil { return "nil" @@ -186,6 +347,29 @@ func (this *UpdateStateResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *ReadStateRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&alertmanagerpb.ReadStateRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ReadStateResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&alertmanagerpb.ReadStateResponse{") + s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n") + s = append(s, "Error: "+fmt.Sprintf("%#v", this.Error)+",\n") + if this.State != nil { + s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringAlertmanager(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -209,6 +393,7 @@ const _ = grpc.SupportPackageIsVersion4 type AlertmanagerClient interface { HandleRequest(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) UpdateState(ctx context.Context, in *clusterpb.Part, opts ...grpc.CallOption) (*UpdateStateResponse, error) + ReadState(ctx context.Context, in *ReadStateRequest, opts ...grpc.CallOption) (*ReadStateResponse, error) } type alertmanagerClient struct { @@ -237,10 +422,20 @@ func (c *alertmanagerClient) UpdateState(ctx context.Context, in *clusterpb.Part return out, nil } +func (c *alertmanagerClient) ReadState(ctx context.Context, in *ReadStateRequest, opts ...grpc.CallOption) (*ReadStateResponse, error) { + out := new(ReadStateResponse) + err := c.cc.Invoke(ctx, "/alertmanagerpb.Alertmanager/ReadState", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // AlertmanagerServer is the server API for Alertmanager service. type AlertmanagerServer interface { HandleRequest(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) UpdateState(context.Context, *clusterpb.Part) (*UpdateStateResponse, error) + ReadState(context.Context, *ReadStateRequest) (*ReadStateResponse, error) } // UnimplementedAlertmanagerServer can be embedded to have forward compatible implementations. @@ -253,6 +448,9 @@ func (*UnimplementedAlertmanagerServer) HandleRequest(ctx context.Context, req * func (*UnimplementedAlertmanagerServer) UpdateState(ctx context.Context, req *clusterpb.Part) (*UpdateStateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method UpdateState not implemented") } +func (*UnimplementedAlertmanagerServer) ReadState(ctx context.Context, req *ReadStateRequest) (*ReadStateResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReadState not implemented") +} func RegisterAlertmanagerServer(s *grpc.Server, srv AlertmanagerServer) { s.RegisterService(&_Alertmanager_serviceDesc, srv) @@ -294,6 +492,24 @@ func _Alertmanager_UpdateState_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _Alertmanager_ReadState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadStateRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AlertmanagerServer).ReadState(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/alertmanagerpb.Alertmanager/ReadState", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AlertmanagerServer).ReadState(ctx, req.(*ReadStateRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Alertmanager_serviceDesc = grpc.ServiceDesc{ ServiceName: "alertmanagerpb.Alertmanager", HandlerType: (*AlertmanagerServer)(nil), @@ -306,6 +522,10 @@ var _Alertmanager_serviceDesc = grpc.ServiceDesc{ MethodName: "UpdateState", Handler: _Alertmanager_UpdateState_Handler, }, + { + MethodName: "ReadState", + Handler: _Alertmanager_ReadState_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "alertmanager.proto", @@ -346,6 +566,76 @@ func (m *UpdateStateResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *ReadStateRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadStateRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReadStateRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *ReadStateResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadStateResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReadStateResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.State != nil { + { + size, err := m.State.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintAlertmanager(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintAlertmanager(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0x12 + } + if m.Status != 0 { + i = encodeVarintAlertmanager(dAtA, i, uint64(m.Status)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintAlertmanager(dAtA []byte, offset int, v uint64) int { offset -= sovAlertmanager(v) base := offset @@ -373,6 +663,35 @@ func (m *UpdateStateResponse) Size() (n int) { return n } +func (m *ReadStateRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *ReadStateResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Status != 0 { + n += 1 + sovAlertmanager(uint64(m.Status)) + } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovAlertmanager(uint64(l)) + } + if m.State != nil { + l = m.State.Size() + n += 1 + l + sovAlertmanager(uint64(l)) + } + return n +} + func sovAlertmanager(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -390,6 +709,27 @@ func (this *UpdateStateResponse) String() string { }, "") return s } +func (this *ReadStateRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ReadStateRequest{`, + `}`, + }, "") + return s +} +func (this *ReadStateResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ReadStateResponse{`, + `Status:` + fmt.Sprintf("%v", this.Status) + `,`, + `Error:` + fmt.Sprintf("%v", this.Error) + `,`, + `State:` + strings.Replace(fmt.Sprintf("%v", this.State), "FullState", "clusterpb.FullState", 1) + `,`, + `}`, + }, "") + return s +} func valueToStringAlertmanager(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -502,6 +842,199 @@ func (m *UpdateStateResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *ReadStateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlertmanager + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadStateRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadStateRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipAlertmanager(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAlertmanager + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAlertmanager + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadStateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlertmanager + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadStateResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadStateResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + m.Status = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlertmanager + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Status |= ReadStateStatus(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlertmanager + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAlertmanager + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAlertmanager + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field State", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAlertmanager + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthAlertmanager + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthAlertmanager + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.State == nil { + m.State = &clusterpb.FullState{} + } + if err := m.State.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAlertmanager(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAlertmanager + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAlertmanager + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipAlertmanager(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/alertmanager/alertmanagerpb/alertmanager.proto b/pkg/alertmanager/alertmanagerpb/alertmanager.proto index 1450ac0f984..2ff154ddfea 100644 --- a/pkg/alertmanager/alertmanagerpb/alertmanager.proto +++ b/pkg/alertmanager/alertmanagerpb/alertmanager.proto @@ -1,6 +1,7 @@ syntax = "proto3"; package alertmanagerpb; +import "gogoproto/gogo.proto"; option go_package = "alertmanagerpb"; @@ -11,6 +12,7 @@ import "github.com/prometheus/alertmanager/cluster/clusterpb/cluster.proto"; service Alertmanager { rpc HandleRequest(httpgrpc.HTTPRequest) returns(httpgrpc.HTTPResponse) {}; rpc UpdateState(clusterpb.Part) returns (UpdateStateResponse) {}; + rpc ReadState(ReadStateRequest) returns (ReadStateResponse) {}; } enum UpdateStateStatus { OK = 0; @@ -23,3 +25,22 @@ message UpdateStateResponse { string error = 2; } +message ReadStateRequest { +} + +enum ReadStateStatus { + READ_UNSPECIFIED = 0; + READ_OK = 1; + READ_ERROR = 2; + READ_USER_NOT_FOUND = 3; +} + +message ReadStateResponse { + // Alertmanager (clusterpb) types do not have Equal methods. + option (gogoproto.equal) = false; + + ReadStateStatus status = 1; + string error = 2; + clusterpb.FullState state = 3; +} + diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 6fd62961fba..07924e7933e 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -35,6 +35,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -1011,6 +1012,69 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us return err } +// ReadFullStateForUser attempts to read the full state from each replica for user. Note that it will try to obtain and return +// state from all replicas, but will consider it a success if state is obtained from at least one replica. +func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) { + + // Only get the set of replicas which contain the specified user. + key := shardByUser(userID) + replicationSet, err := am.ring.Get(key, RingOp, nil, nil, nil) + if err != nil { + return nil, err + } + + // We should only query state from other replicas, and not our own state. + addrs := replicationSet.GetAddressesWithout(am.ringLifecycler.GetInstanceAddr()) + + var ( + resultsMtx sync.Mutex + results []*clusterpb.FullState + ) + + // Note that the jobs swallow the errors - this is because we want to give each replica a chance to respond. + jobs := concurrency.CreateJobsFromStrings(addrs) + err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { + addr := job.(string) + level.Debug(am.logger).Log("msg", "contacting replica for full state", "user", userID, "addr", addr) + + c, err := am.alertmanagerClientsPool.GetClientFor(addr) + if err != nil { + level.Error(am.logger).Log("msg", "failed to get rpc client", "err", err) + return nil + } + + resp, err := c.ReadState(user.InjectOrgID(ctx, userID), &alertmanagerpb.ReadStateRequest{}) + if err != nil { + level.Error(am.logger).Log("msg", "rpc reading state from replica failed", "addr", addr, "user", userID, "err", err) + return nil + } + + switch resp.Status { + case alertmanagerpb.READ_OK: + resultsMtx.Lock() + results = append(results, resp.State) + resultsMtx.Unlock() + case alertmanagerpb.READ_ERROR: + level.Error(am.logger).Log("msg", "error trying to read state", "addr", addr, "user", userID, "err", resp.Error) + case alertmanagerpb.READ_USER_NOT_FOUND: + level.Debug(am.logger).Log("msg", "user not found while trying to read state", "addr", addr, "user", userID) + default: + level.Error(am.logger).Log("msg", "unknown response trying to read state", "addr", addr, "user", userID) + } + return nil + }) + if err != nil { + return nil, err + } + + // We only require the state from a single replica, though we return as many as we were able to obtain. + if len(results) == 0 { + return nil, fmt.Errorf("failed to read state from any replica") + } + + return results, nil +} + // UpdateState implements the Alertmanager service. func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) { userID, err := tenant.TenantID(ctx) @@ -1089,6 +1153,39 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string { return result } +// UpdateState implements the Alertmanager service. +func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + am.alertmanagersMtx.Lock() + userAM, ok := am.alertmanagers[userID] + am.alertmanagersMtx.Unlock() + + if !ok { + level.Debug(am.logger).Log("msg", "user does not have an alertmanager in this instance", "user", userID) + return &alertmanagerpb.ReadStateResponse{ + Status: alertmanagerpb.READ_USER_NOT_FOUND, + Error: "alertmanager for this user does not exists", + }, nil + } + + state, err := userAM.getFullState() + if err != nil { + return &alertmanagerpb.ReadStateResponse{ + Status: alertmanagerpb.READ_ERROR, + Error: err.Error(), + }, nil + } + + return &alertmanagerpb.ReadStateResponse{ + Status: alertmanagerpb.READ_OK, + State: state, + }, nil +} + // StatusHandler shows the status of the alertmanager. type StatusHandler struct { am *MultitenantAlertmanager diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index f3e625e2d96..edc2f27cd6c 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -1205,6 +1205,185 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) { } } +func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testing.T) { + tc := []struct { + name string + replicationFactor int + }{ + { + name: "RF = 2", + replicationFactor: 2, + }, + { + name: "RF = 3", + replicationFactor: 3, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := prepareInMemoryAlertStore() + clientPool := newPassthroughAlertmanagerClientPool() + externalURL := flagext.URLValue{} + err := externalURL.Set("http://localhost:8080/alertmanager") + require.NoError(t, err) + + var instances []*MultitenantAlertmanager + var instanceIDs []string + registries := util.NewUserRegistries() + + // Create only two users - no need for more for these test cases. + for i := 1; i <= 2; i++ { + u := fmt.Sprintf("u-%d", i) + require.NoError(t, mockStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{ + User: u, + RawConfig: simpleConfigOne, + Templates: []*alertspb.TemplateDesc{}, + })) + } + + createInstance := func(i int) *MultitenantAlertmanager { + instanceIDs = append(instanceIDs, fmt.Sprintf("alertmanager-%d", i)) + instanceID := fmt.Sprintf("alertmanager-%d", i) + + amConfig := mockAlertmanagerConfig(t) + amConfig.ExternalURL = externalURL + amConfig.ShardingRing.ReplicationFactor = tt.replicationFactor + amConfig.ShardingRing.InstanceID = instanceID + amConfig.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + + // Do not check the ring topology changes or poll in an interval in this test (we explicitly sync alertmanagers). + amConfig.PollInterval = time.Hour + amConfig.ShardingRing.RingCheckPeriod = time.Hour + + amConfig.ShardingEnabled = true + + reg := prometheus.NewPedanticRegistry() + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), reg) + require.NoError(t, err) + + clientPool.servers[amConfig.ShardingRing.InstanceAddr+":0"] = am + am.alertmanagerClientsPool = clientPool + + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, am)) + }) + + instances = append(instances, am) + instanceIDs = append(instanceIDs, instanceID) + registries.AddUserRegistry(instanceID, reg) + + // Make sure the ring is settled. + { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. + for _, am := range instances { + for _, id := range instanceIDs { + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + } + } + } + + // Now that the ring has settled, sync configs with the instances. + require.NoError(t, am.loadAndSyncConfigs(ctx, reasonRingChange)) + + return am + } + + writeSilence := func(i *MultitenantAlertmanager, userID string) { + silence := types.Silence{ + Matchers: labels.Matchers{ + {Name: "instance", Value: "prometheus-one"}, + }, + Comment: "Created for a test case.", + StartsAt: time.Now(), + EndsAt: time.Now().Add(time.Hour), + } + data, err := json.Marshal(silence) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, externalURL.String()+"/api/v2/silences", bytes.NewReader(data)) + req.Header.Set("content-type", "application/json") + reqCtx := user.InjectOrgID(req.Context(), userID) + { + w := httptest.NewRecorder() + i.ServeHTTP(w, req.WithContext(reqCtx)) + + resp := w.Result() + body, _ := ioutil.ReadAll(resp.Body) + assert.Equal(t, http.StatusOK, w.Code) + require.Regexp(t, regexp.MustCompile(`{"silenceID":".+"}`), string(body)) + } + } + + checkSilence := func(i *MultitenantAlertmanager, userID string) { + req := httptest.NewRequest(http.MethodGet, externalURL.String()+"/api/v2/silences", nil) + req.Header.Set("content-type", "application/json") + reqCtx := user.InjectOrgID(req.Context(), userID) + { + w := httptest.NewRecorder() + i.ServeHTTP(w, req.WithContext(reqCtx)) + + resp := w.Result() + body, _ := ioutil.ReadAll(resp.Body) + assert.Equal(t, http.StatusOK, w.Code) + require.Regexp(t, regexp.MustCompile(`"comment":"Created for a test case."`), string(body)) + } + } + + // 1. Create the first instance and load the user configurations. + i1 := createInstance(1) + + // 2. Create a silence in the first alertmanager instance and check we can read it. + writeSilence(i1, "u-1") + // 2.a. Check the silence was created (paranoia). + checkSilence(i1, "u-1") + // 2.b. Check the relevant metrics were updated. + { + metrics := registries.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(1), metrics.GetSumOfGauges("cortex_alertmanager_silences")) + assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total")) + assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total")) + } + + // 3. Create a second instance. This should attempt to fetch the silence from the first. + i2 := createInstance(2) + + // 3.a. Check the silence was fetched from the first instance successfully. + checkSilence(i2, "u-1") + + // 3.b. Check the metrics: We should see the additional silences without any replication activity. + { + metrics := registries.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(2), metrics.GetSumOfGauges("cortex_alertmanager_silences")) + assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total")) + assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total")) + } + + if tt.replicationFactor >= 3 { + // 4. When testing RF = 3, create a third instance, to test obtaining state from multiple places. + i3 := createInstance(3) + + // 4.a. Check the silence was fetched one or both of the instances successfully. + checkSilence(i3, "u-1") + + // 4.b. Check the metrics one more time. We should have three replicas of the silence. + { + metrics := registries.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(3), metrics.GetSumOfGauges("cortex_alertmanager_silences")) + assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_total")) + assert.Equal(t, float64(0), metrics.GetSumOfCounters("cortex_alertmanager_state_replication_failed_total")) + } + } + }) + } +} + // prepareInMemoryAlertStore builds and returns an in-memory alert store. func prepareInMemoryAlertStore() alertstore.AlertStore { return bucketclient.NewBucketAlertStore(objstore.NewInMemBucket(), nil, log.NewNopLogger()) @@ -1251,6 +1430,10 @@ func (am *passthroughAlertmanagerClient) UpdateState(ctx context.Context, in *cl return am.server.UpdateState(ctx, in) } +func (am *passthroughAlertmanagerClient) ReadState(ctx context.Context, in *alertmanagerpb.ReadStateRequest, opts ...grpc.CallOption) (*alertmanagerpb.ReadStateResponse, error) { + return am.server.ReadState(ctx, in) +} + func (am *passthroughAlertmanagerClient) HandleRequest(context.Context, *httpgrpc.HTTPRequest, ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { return nil, fmt.Errorf("unexpected call to HandleRequest") } diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index 17fc686ee5d..23dac007bfe 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "sync" + "time" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/client_golang/prometheus" @@ -16,6 +18,10 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) +const ( + defaultSettleReadTimeout = 15 * time.Second +) + // state represents the Alertmanager silences and notification log internal state. type state struct { services.Service @@ -24,6 +30,8 @@ type state struct { logger log.Logger reg prometheus.Registerer + settleReadTimeout time.Duration + mtx sync.Mutex states map[string]cluster.State @@ -49,6 +57,7 @@ func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r p states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences. msgc: make(chan *clusterpb.Part), reg: r, + settleReadTimeout: defaultSettleReadTimeout, partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "alertmanager_partial_state_merges_total", Help: "Number of times we have received a partial state to merge for a key.", @@ -85,8 +94,8 @@ func (s *state) AddState(key string, cs cluster.State, _ prometheus.Registerer) s.stateReplicationFailed.WithLabelValues(key) return &stateChannel{ - msgc: s.msgc, - key: key, + s: s, + key: key, } } @@ -115,13 +124,50 @@ func (s *state) Position() int { return s.replicator.GetPositionForUser(s.userID) } +// GetFullState returns the full internal state. +func (s *state) GetFullState() (*clusterpb.FullState, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + all := &clusterpb.FullState{ + Parts: make([]clusterpb.Part, 0, len(s.states)), + } + + for key, s := range s.states { + b, err := s.MarshalBinary() + if err != nil { + return nil, errors.Wrapf(err, "failed to encode state for key: %v", key) + } + all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b}) + } + + return all, nil +} + // starting waits until the alertmanagers are ready (and sets the appropriate internal state when it is). // The idea is that we don't want to start working" before we get a chance to know most of the notifications and/or silences. func (s *state) starting(ctx context.Context) error { level.Info(s.logger).Log("msg", "Waiting for notification and silences to settle...") - // TODO: Make sure that the state is fully synchronised at this point. + // If the replication factor is <= 1, there is nowhere to obtain the state from. + if s.replicationFactor <= 1 { + level.Info(s.logger).Log("msg", "skipping settling (no replicas)") + return nil + } + // We can check other alertmanager(s) and explicitly ask them to propagate their state to us if available. + readCtx, cancel := context.WithTimeout(ctx, s.settleReadTimeout) + defer cancel() + + fullStates, err := s.replicator.ReadFullStateForUser(readCtx, s.userID) + if err == nil { + if err = s.mergeFullStates(fullStates); err == nil { + level.Info(s.logger).Log("msg", "state settled; proceeding") + return nil + } + } + + level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err) return nil } @@ -134,6 +180,30 @@ func (s *state) Ready() bool { return s.Service.State() == services.Running } +// mergeFullStates attempts to merge all full states received from peers during settling. +func (s *state) mergeFullStates(fs []*clusterpb.FullState) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, f := range fs { + for _, p := range f.Parts { + level.Debug(s.logger).Log("msg", "merging full state", "user", s.userID, "key", p.Key, "bytes", len(p.Data)) + + st, ok := s.states[p.Key] + if !ok { + level.Error(s.logger).Log("msg", "key not found while merging full state", "user", s.userID, "key", p.Key) + continue + } + + if err := st.Merge(p.Data); err != nil { + return errors.Wrapf(err, "failed to merge part of full state for key: %v", p.Key) + } + } + } + + return nil +} + func (s *state) running(ctx context.Context) error { for { select { @@ -154,14 +224,21 @@ func (s *state) running(ctx context.Context) error { } } +func (s *state) broadcast(key string, b []byte) { + // We should ignore the Merges into the initial state during settling. + if s.Ready() { + s.msgc <- &clusterpb.Part{Key: key, Data: b} + } +} + // stateChannel allows a state publisher to send messages that will be broadcasted to all other alertmanagers that a tenant // belongs to. type stateChannel struct { - msgc chan *clusterpb.Part - key string + s *state + key string } // Broadcast receives a message to be replicated by the state. func (c *stateChannel) Broadcast(b []byte) { - c.msgc <- &clusterpb.Part{Key: c.key, Data: b} + c.s.broadcast(c.key, b) } diff --git a/pkg/alertmanager/state_replication_test.go b/pkg/alertmanager/state_replication_test.go index 198157e0f5a..59c758dc906 100644 --- a/pkg/alertmanager/state_replication_test.go +++ b/pkg/alertmanager/state_replication_test.go @@ -2,6 +2,8 @@ package alertmanager import ( "context" + "errors" + "sort" "strings" "sync" "testing" @@ -19,18 +21,30 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) -type fakeState struct{} +type fakeState struct { + binary []byte + merges [][]byte +} -func (s fakeState) MarshalBinary() ([]byte, error) { - return []byte{}, nil +func (s *fakeState) MarshalBinary() ([]byte, error) { + return s.binary, nil } -func (s fakeState) Merge(_ []byte) error { + +func (s *fakeState) Merge(data []byte) error { + s.merges = append(s.merges, data) return nil } +type readStateResult struct { + res []*clusterpb.FullState + err error + blocking bool +} + type fakeReplicator struct { mtx sync.Mutex results map[string]*clusterpb.Part + read readStateResult } func newFakeReplicator() *fakeReplicator { @@ -50,6 +64,18 @@ func (f *fakeReplicator) GetPositionForUser(_ string) int { return 0 } +func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) { + if userID != "user-1" { + return nil, errors.New("Unexpected userID") + } + + if f.read.blocking { + <-ctx.Done() + return nil, ctx.Err() + } + return f.read.res, f.read.err +} + func TestStateReplication(t *testing.T) { tc := []struct { name string @@ -75,6 +101,7 @@ func TestStateReplication(t *testing.T) { t.Run(tt.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() replicator := newFakeReplicator() + replicator.read = readStateResult{res: nil, err: nil} s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) require.False(t, s.Ready()) @@ -129,3 +156,186 @@ alertmanager_state_replication_total{key="nflog"} 1 }) } } + +func TestStateReplication_Settle(t *testing.T) { + + tc := []struct { + name string + replicationFactor int + read readStateResult + results map[string][][]byte + }{ + { + name: "with a replication factor of <= 1, no state can be read from peers.", + replicationFactor: 1, + read: readStateResult{}, + results: map[string][][]byte{ + "key1": nil, + "key2": nil, + }, + }, + { + name: "with a replication factor of > 1, state is read from all peers.", + replicationFactor: 3, + read: readStateResult{ + res: []*clusterpb.FullState{ + {Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}, {Key: "key2", Data: []byte("Datum2")}}}, + {Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum3")}, {Key: "key2", Data: []byte("Datum4")}}}, + }, + }, + results: map[string][][]byte{ + "key1": {[]byte("Datum1"), []byte("Datum3")}, + "key2": {[]byte("Datum2"), []byte("Datum4")}, + }, + }, + { + name: "with full state having no parts, nothing is merged.", + replicationFactor: 3, + read: readStateResult{ + res: []*clusterpb.FullState{{Parts: []clusterpb.Part{}}}, + }, + results: map[string][][]byte{ + "key1": nil, + "key2": nil, + }, + }, + { + name: "with an unknown key, parts in the same state are merged.", + replicationFactor: 3, + read: readStateResult{ + res: []*clusterpb.FullState{{Parts: []clusterpb.Part{ + {Key: "unknown", Data: []byte("Wow")}, + {Key: "key1", Data: []byte("Datum1")}, + }}}, + }, + results: map[string][][]byte{ + "key1": {[]byte("Datum1")}, + "key2": nil, + }, + }, + { + name: "with an unknown key, parts in other states are merged.", + replicationFactor: 3, + read: readStateResult{ + res: []*clusterpb.FullState{ + {Parts: []clusterpb.Part{{Key: "unknown", Data: []byte("Wow")}}}, + {Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}}}, + }, + }, + results: map[string][][]byte{ + "key1": {[]byte("Datum1")}, + "key2": nil, + }, + }, + { + name: "when reading the full state fails, still become ready.", + replicationFactor: 3, + read: readStateResult{err: errors.New("Read Error 1")}, + results: map[string][][]byte{ + "key1": nil, + "key2": nil, + }, + }, + { + name: "when reading the full state takes too long, hit timeout but become ready.", + replicationFactor: 3, + read: readStateResult{blocking: true}, + results: map[string][][]byte{ + "key1": nil, + "key2": nil, + }, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + + replicator := newFakeReplicator() + replicator.read = tt.read + s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) + + key1State := &fakeState{} + key2State := &fakeState{} + + s.AddState("key1", key1State, reg) + s.AddState("key2", key2State, reg) + + s.settleReadTimeout = 1 * time.Second + + assert.False(t, s.Ready()) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), s)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), s)) + }) + + assert.True(t, s.Ready()) + + // Note: We don't actually test beyond Merge() here, just that all data is forwarded. + assert.Equal(t, tt.results["key1"], key1State.merges) + assert.Equal(t, tt.results["key2"], key2State.merges) + }) + } +} + +func TestStateReplication_GetFullState(t *testing.T) { + + tc := []struct { + name string + data map[string][]byte + result *clusterpb.FullState + }{ + { + name: "no keys", + data: map[string][]byte{}, + result: &clusterpb.FullState{ + Parts: []clusterpb.Part{}, + }, + }, + { + name: "zero length data", + data: map[string][]byte{ + "key1": {}, + }, + result: &clusterpb.FullState{ + Parts: []clusterpb.Part{ + {Key: "key1", Data: []byte{}}, + }, + }, + }, + { + name: "keys with data", + data: map[string][]byte{ + "key1": []byte("Datum1"), + "key2": []byte("Datum2"), + }, + result: &clusterpb.FullState{ + Parts: []clusterpb.Part{ + {Key: "key1", Data: []byte("Datum1")}, + {Key: "key2", Data: []byte("Datum2")}, + }, + }, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg) + + for key, datum := range tt.data { + state := &fakeState{binary: datum} + s.AddState(key, state, reg) + } + + result, err := s.GetFullState() + require.NoError(t, err) + + // Key ordering is undefined for the code under test. + sort.Slice(result.Parts, func(i, j int) bool { return result.Parts[i].Key < result.Parts[j].Key }) + + assert.Equal(t, tt.result, result) + }) + } +} diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 253a7feb1a8..47485895a54 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -115,6 +115,18 @@ func (r ReplicationSet) GetAddresses() []string { return addrs } +// GetAddressesWithout returns the addresses of all instances within the replication set while +// excluding the specified address. Returned slice order is not guaranteed. +func (r ReplicationSet) GetAddressesWithout(exclude string) []string { + addrs := make([]string, 0, len(r.Instances)) + for _, desc := range r.Instances { + if desc.Addr != exclude { + addrs = append(addrs, desc.Addr) + } + } + return addrs +} + // HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps), // false if they differ in any way (number of instances, instance states, tokens, zones, ...). func HasReplicationSetChanged(before, after ReplicationSet) bool { diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index 2502a8336ae..74ef17ba163 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -39,6 +39,48 @@ func TestReplicationSet_GetAddresses(t *testing.T) { } } +func TestReplicationSet_GetAddressesWithout(t *testing.T) { + tests := map[string]struct { + rs ReplicationSet + expected []string + exclude string + }{ + "should return an empty slice on empty replication set": { + rs: ReplicationSet{}, + expected: []string{}, + exclude: "127.0.0.1", + }, + "non-matching exclusion, should return all addresses": { + rs: ReplicationSet{ + Instances: []InstanceDesc{ + {Addr: "127.0.0.1"}, + {Addr: "127.0.0.2"}, + {Addr: "127.0.0.3"}, + }, + }, + expected: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + exclude: "127.0.0.4", + }, + "matching exclusion, should return non-excluded addresses": { + rs: ReplicationSet{ + Instances: []InstanceDesc{ + {Addr: "127.0.0.1"}, + {Addr: "127.0.0.2"}, + {Addr: "127.0.0.3"}, + }, + }, + expected: []string{"127.0.0.1", "127.0.0.3"}, + exclude: "127.0.0.2", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.ElementsMatch(t, testData.expected, testData.rs.GetAddressesWithout(testData.exclude)) + }) + } +} + var ( errFailure = errors.New("failed") errZoneFailure = errors.New("zone failed")