diff --git a/CHANGELOG.md b/CHANGELOG.md index fb50e20270b..31ddc38f5ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,12 @@ * `-cluster.peer` in favor of `-alertmanager.cluster.peers` * `-cluster.peer-timeout` in favor of `-alertmanager.cluster.peer-timeout` * [FEATURE] Querier: Queries can be federated across multiple tenants. The tenants IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` request header. This is an experimental feature, which can be enabled by setting `-tenant-federation.enabled=true` on all Cortex services. #3250 +* [FEATURE] Alertmanager: introduced the experimental option `-alertmanager.sharding-enabled` to shard tenants across multiple Alertmanager instances. This feature is still under heavy development and its usage is discouraged. The following new metrics are exported by the Alertmanager: #3664 + * `cortex_alertmanager_ring_check_errors_total` + * `cortex_alertmanager_sync_configs_total` + * `cortex_alertmanager_sync_configs_failed_total` + * `cortex_alertmanager_tenants_discovered` + * `cortex_alertmanager_tenants_owned` * [ENHANCEMENT] Blocks storage: introduced a per-tenant bucket index, periodically updated by the compactor, used to avoid full bucket scanning done by queriers, store-gateways and rulers. The bucket index is updated by the compactor during blocks cleanup, on every `-compactor.cleanup-interval`. #3553 #3555 #3561 #3583 #3625 * [ENHANCEMENT] Blocks storage: introduced an option `-blocks-storage.bucket-store.bucket-index.enabled` to enable the usage of the bucket index in the querier, store-gateway and ruler. When enabled, the querier, store-gateway and ruler will use the bucket index to find a tenant's blocks instead of running the periodic bucket scan. The following new metrics are exported by the querier and ruler: #3614 #3625 * `cortex_bucket_index_loads_total` diff --git a/docs/api/_index.md b/docs/api/_index.md index 8a453aebf47..1cc86732376 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -50,6 +50,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi | [Delete rule group](#delete-rule-group) | Ruler | `DELETE /api/v1/rules/{namespace}/{groupName}` | | [Delete namespace](#delete-namespace) | Ruler | `DELETE /api/v1/rules/{namespace}` | | [Alertmanager status](#alertmanager-status) | Alertmanager | `GET /multitenant_alertmanager/status` | +| [Alertmanager ring status](#alertmanager-ring-status) | Alertmanager | `GET /multitenant_alertmanager/ring` | | [Alertmanager UI](#alertmanager-ui) | Alertmanager | `GET /` | | [Get Alertmanager configuration](#get-alertmanager-configuration) | Alertmanager | `GET /api/v1/alerts` | | [Set Alertmanager configuration](#set-alertmanager-configuration) | Alertmanager | `POST /api/v1/alerts` | @@ -640,6 +641,14 @@ GET /status Displays a web page with the current status of the Alertmanager, including the Alertmanager cluster members. +### Alertmanager ring status + +``` +GET /multitenant_alertmanager/ring +``` + +Displays a web page with the Alertmanager hash ring status, including the state, healthy and last heartbeat time of each Alertmanager instance. + ### Alertmanager UI ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e8fc03a816e..358f63251c4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1501,6 +1501,64 @@ The `alertmanager_config` configures the Cortex alertmanager. # CLI flag: -cluster.peer-timeout [peer_timeout: | default = 15s] +# Shard tenants across multiple alertmanager instances. +# CLI flag: -alertmanager.sharding-enabled +[sharding_enabled: | default = false] + +sharding_ring: + # The key-value store used to share the hash ring across multiple instances. + kvstore: + # Backend storage to use for the ring. Supported values are: consul, etcd, + # inmemory, memberlist, multi. + # CLI flag: -alertmanager.sharding-ring.store + [store: | default = "consul"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -alertmanager.sharding-ring.prefix + [prefix: | default = "alertmanagers/"] + + # The consul_config configures the consul client. + # The CLI flags prefix for this block config is: alertmanager.sharding-ring + [consul: ] + + # The etcd_config configures the etcd client. + # The CLI flags prefix for this block config is: alertmanager.sharding-ring + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -alertmanager.sharding-ring.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -alertmanager.sharding-ring.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -alertmanager.sharding-ring.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -alertmanager.sharding-ring.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # Period at which to heartbeat to the ring. + # CLI flag: -alertmanager.sharding-ring.heartbeat-period + [heartbeat_period: | default = 15s] + + # The heartbeat timeout after which alertmanagers are considered unhealthy + # within the ring. + # CLI flag: -alertmanager.sharding-ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # The replication factor to use when sharding the alertmanager. + # CLI flag: -alertmanager.sharding-ring.replication-factor + [replication_factor: | default = 3] + + # Name of network interface to read address from. + # CLI flag: -alertmanager.sharding-ring.instance-interface-names + [instance_interface_names: | default = [eth0 en0]] + # Filename of fallback config to use if none specified for instance. # CLI flag: -alertmanager.configs.fallback [fallback_config_file: | default = ""] @@ -2886,6 +2944,7 @@ grpc_client_config: The `etcd_config` configures the etcd client. The supported CLI flags `` used to reference this config block are: - _no prefix_ +- `alertmanager.sharding-ring` - `compactor.ring` - `distributor.ha-tracker` - `distributor.ring` @@ -2933,6 +2992,7 @@ The `etcd_config` configures the etcd client. The supported CLI flags `` The `consul_config` configures the consul client. The supported CLI flags `` used to reference this config block are: - _no prefix_ +- `alertmanager.sharding-ring` - `compactor.ring` - `distributor.ha-tracker` - `distributor.ring` diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 91165f39835..6eabe63226f 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -66,3 +66,4 @@ Currently experimental features are: - The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental - The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions - Querier: tenant federation +- Alertmanager: Sharding of tenants across multiple instances diff --git a/integration/alertmanager_test.go b/integration/alertmanager_test.go index f2bec0426d2..4189b8701c6 100644 --- a/integration/alertmanager_test.go +++ b/integration/alertmanager_test.go @@ -206,3 +206,82 @@ func TestAlertmanagerClustering(t *testing.T) { require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(2)), "alertmanager_cluster_members")) } } + +func TestAlertmanagerSharding(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags()) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-alertmanager.storage.s3.buckets"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + client, err := s3.NewS3ObjectClient(s3.S3Config{ + Endpoint: minio.HTTPEndpoint(), + S3ForcePathStyle: true, + Insecure: true, + BucketNames: flags["-alertmanager.storage.s3.buckets"], + AccessKeyID: e2edb.MinioAccessKey, + SecretAccessKey: e2edb.MinioSecretKey, + }) + require.NoError(t, err) + + // Create and upload Alertmanager configurations. + for i := 1; i <= 30; i++ { + user := fmt.Sprintf("user-%d", i) + desc := alerts.AlertConfigDesc{ + RawConfig: simpleAlertmanagerConfig, + User: user, + Templates: []*alerts.TemplateDesc{}, + } + + d, err := desc.Marshal() + require.NoError(t, err) + err = client.PutObject(context.Background(), fmt.Sprintf("/alerts/%s", user), bytes.NewReader(d)) + require.NoError(t, err) + } + + // 3 instances, 30 configurations and a replication factor of 2. + flags = mergeFlags(flags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 2)) + + // Wait for the Alertmanagers to start. + alertmanager1 := e2ecortex.NewAlertmanager("alertmanager-1", flags, "") + alertmanager2 := e2ecortex.NewAlertmanager("alertmanager-2", flags, "") + alertmanager3 := e2ecortex.NewAlertmanager("alertmanager-3", flags, "") + + alertmanagers := e2ecortex.NewCompositeCortexService(alertmanager1, alertmanager2, alertmanager3) + + // Start Alertmanager instances. + for _, am := range alertmanagers.Instances() { + require.NoError(t, s.StartAndWaitReady(am)) + } + + for _, am := range alertmanagers.Instances() { + require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "alertmanager"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"), + ))) + + // We expect every instance to discover every configuration but only own a subset of them. + require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(30)), "cortex_alertmanager_tenants_discovered")) + // We know that the ring has settled when every instance has some tenants and the total number of tokens have been assigned. + require.NoError(t, am.WaitSumMetrics(e2e.Greater(float64(0)), "cortex_alertmanager_tenants_owned")) + require.NoError(t, am.WaitSumMetrics(e2e.Equals(float64(384)), "cortex_ring_tokens_total")) + } + + var totalTenants int + for _, am := range alertmanagers.Instances() { + values, err := am.SumMetrics([]string{"cortex_alertmanager_tenants_owned"}) + require.NoError(t, err) + + tenants := int(e2e.SumValues(values)) + totalTenants += tenants + } + + // The total number of tenants across all instances is: total alertmanager configs * replication factor. + // In this case: 30 * 2 + require.Equal(t, 60, totalTenants) +} diff --git a/integration/configs.go b/integration/configs.go index 64c4f683ddc..bc45f3b7c06 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "text/template" @@ -102,6 +103,15 @@ var ( } } + AlertmanagerShardingFlags = func(consulAddress string, replicationFactor int) map[string]string { + return map[string]string{ + "-alertmanager.sharding-enabled": "true", + "-alertmanager.sharding-ring.store": "consul", + "-alertmanager.sharding-ring.consul.hostname": consulAddress, + "-alertmanager.sharding-ring.replication-factor": strconv.Itoa(replicationFactor), + } + } + AlertmanagerLocalFlags = func() map[string]string { return map[string]string{ "-alertmanager.storage.type": "local", diff --git a/pkg/alertmanager/alertmanager_http.go b/pkg/alertmanager/alertmanager_http.go new file mode 100644 index 00000000000..2617c58f3c4 --- /dev/null +++ b/pkg/alertmanager/alertmanager_http.go @@ -0,0 +1,53 @@ +package alertmanager + +import ( + "net/http" + "text/template" + + "github.com/go-kit/kit/log/level" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" +) + +var ( + statusPageTemplate = template.Must(template.New("main").Parse(` + + + + + Cortex Alertmanager Ring + + +

Cortex Alertmanager Ring

+

{{ .Message }}

+ + `)) +) + +func writeMessage(w http.ResponseWriter, message string) { + w.WriteHeader(http.StatusOK) + err := statusPageTemplate.Execute(w, struct { + Message string + }{Message: message}) + + if err != nil { + level.Error(util.Logger).Log("msg", "unable to serve alertmanager ring page", "err", err) + } +} + +func (am *MultitenantAlertmanager) RingHandler(w http.ResponseWriter, req *http.Request) { + if !am.cfg.ShardingEnabled { + writeMessage(w, "Alertmanager has no ring because sharding is disabled.") + return + } + + if am.State() != services.Running { + // we cannot read the ring before the alertmanager is in Running state, + // because that would lead to race condition. + writeMessage(w, "Alertmanager is not running yet.") + return + } + + am.ring.ServeHTTP(w, req) +} diff --git a/pkg/alertmanager/alertmanager_ring.go b/pkg/alertmanager/alertmanager_ring.go new file mode 100644 index 00000000000..0a7bb17c5b0 --- /dev/null +++ b/pkg/alertmanager/alertmanager_ring.go @@ -0,0 +1,114 @@ +package alertmanager + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/go-kit/kit/log/level" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +const ( + // RingKey is the key under which we store the alertmanager ring in the KVStore. + RingKey = "alertmanager" + + // RingNameForServer is the name of the ring used by the alertmanager server. + RingNameForServer = "alertmanager" + + // RingNumTokens is a safe default instead of exposing to config option to the user + // in order to simplify the config. + RingNumTokens = 128 +) + +// RingOp is the operation used for distributing tenants between alertmanagers. +var RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, func(s ring.IngesterState) bool { + // Only ACTIVE Alertmanager get requests. If instance is not ACTIVE, we need to find another Alertmanager. + return s != ring.ACTIVE +}) + +// RingConfig masks the ring lifecycler config which contains +// many options not really required by the alertmanager ring. This config +// is used to strip down the config to the minimum, and avoid confusion +// to the user. +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + + // Instance details + InstanceID string `yaml:"instance_id" doc:"hidden"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port" doc:"hidden"` + InstanceAddr string `yaml:"instance_addr" doc:"hidden"` + + // Injected internally + ListenPort int `yaml:"-"` + RingCheckPeriod time.Duration `yaml:"-"` + + // Used for testing + SkipUnregister bool `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + level.Error(util.Logger).Log("msg", "failed to get hostname", "err", err) + os.Exit(1) + } + + // Prefix used by all the ring flags + rfprefix := "alertmanager.sharding-ring." + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f) + f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring.") + f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), rfprefix+"instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, rfprefix+"instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, rfprefix+"instance-port", 0, "Port to advertise in the ring (defaults to server.http-listen-port).") + f.StringVar(&cfg.InstanceID, rfprefix+"instance-id", hostname, "Instance ID to register in the ring.") + + cfg.RingCheckPeriod = 5 * time.Second +} + +// ToLifecyclerConfig returns a LifecyclerConfig based on the alertmanager +// ring config. +func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: RingNumTokens, + }, nil +} + +func (cfg *RingConfig) ToRingConfig() ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ReplicationFactor = cfg.ReplicationFactor + + return rc +} diff --git a/pkg/alertmanager/alertmanager_ring_test.go b/pkg/alertmanager/alertmanager_ring_test.go new file mode 100644 index 00000000000..9d920f2d19b --- /dev/null +++ b/pkg/alertmanager/alertmanager_ring_test.go @@ -0,0 +1,55 @@ +package alertmanager + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/ring" +) + +func TestIsHealthyForAlertmanagerOperations(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + instance *ring.IngesterDesc + timeout time.Duration + expected bool + }{ + "ACTIVE instance with last keepalive newer than timeout": { + instance: &ring.IngesterDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: true, + }, + "ACTIVE instance with last keepalive older than timeout": { + instance: &ring.IngesterDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "JOINING instance with last keepalive newer than timeout": { + instance: &ring.IngesterDesc{State: ring.JOINING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "LEAVING instance with last keepalive newer than timeout": { + instance: &ring.IngesterDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + "PENDING instance with last keepalive newer than timeout": { + instance: &ring.IngesterDesc{State: ring.PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()}, + timeout: time.Minute, + expected: false, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + actual := testData.instance.IsHealthy(RingOp, testData.timeout, time.Now()) + assert.Equal(t, testData.expected, actual) + }) + } +} diff --git a/pkg/alertmanager/lifecycle.go b/pkg/alertmanager/lifecycle.go new file mode 100644 index 00000000000..27f1784eb6e --- /dev/null +++ b/pkg/alertmanager/lifecycle.go @@ -0,0 +1,28 @@ +package alertmanager + +import ( + "github.com/cortexproject/cortex/pkg/ring" +) + +func (r *MultitenantAlertmanager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens) { + // When we initialize the alertmanager instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any). + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + _, takenTokens := ringDesc.TokensFor(instanceID) + newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (r *MultitenantAlertmanager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (r *MultitenantAlertmanager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (r *MultitenantAlertmanager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.IngesterDesc) { +} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 32e217c02d7..178cc8f44ce 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "hash/fnv" "html/template" "io/ioutil" "net/http" @@ -22,23 +23,28 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/alertmanager/alerts" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" ) -var backoffConfig = util.BackoffConfig{ - // Backoff for loading initial configuration set. - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, -} - const ( // If a config sets the webhook URL to this, it will be rewritten to // a URL derived from Config.AutoWebhookRoot autoWebhookURL = "http://internal.monitor" + // Reasons for (re)syncing alertmanager configurations from object storage. + reasonPeriodic = "periodic" + reasonInitial = "initial" + reasonRingChange = "ring-change" + + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 5 + statusPage = ` @@ -94,6 +100,10 @@ type MultitenantAlertmanagerConfig struct { DeprecatedPeers flagext.StringSlice `yaml:"peers"` DeprecatedPeerTimeout time.Duration `yaml:"peer_timeout"` + // Enable sharding for the Alertmanager + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingRing RingConfig `yaml:"sharding_ring"` + FallbackConfigFile string `yaml:"fallback_config_file"` AutoWebhookRoot string `yaml:"auto_webhook_root"` @@ -135,6 +145,9 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableAPI, "experimental.alertmanager.enable-api", false, "Enable the experimental alertmanager config api.") + f.BoolVar(&cfg.ShardingEnabled, "alertmanager.sharding-enabled", false, "Shard tenants across multiple alertmanager instances.") + + cfg.ShardingRing.RegisterFlags(f) cfg.Store.RegisterFlags(f) cfg.Cluster.RegisterFlags(f) } @@ -212,6 +225,14 @@ type MultitenantAlertmanager struct { cfg *MultitenantAlertmanagerConfig + // Ring used for sharding alertmanager instances. + ringLifecycler *ring.BasicLifecycler + ring *ring.Ring + + // Subservices manager (ring, lifecycler) + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + store AlertStore // The fallback config is stored as a string and parsed every time it's needed @@ -230,6 +251,13 @@ type MultitenantAlertmanager struct { multitenantMetrics *multitenantAlertmanagerMetrics peer *cluster.Peer + + registry prometheus.Registerer + ringCheckErrors prometheus.Counter + tenantsOwned prometheus.Gauge + tenantsDiscovered prometheus.Gauge + syncTotal *prometheus.CounterVec + syncFailures *prometheus.CounterVec } // NewMultitenantAlertmanager creates a new MultitenantAlertmanager. @@ -287,10 +315,22 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.L return nil, err } - return createMultitenantAlertmanager(cfg, fallbackConfig, peer, store, logger, registerer), nil + var ringStore kv.Client + if cfg.ShardingEnabled { + ringStore, err = kv.NewClient( + cfg.ShardingRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(registerer, "alertmanager"), + ) + if err != nil { + return nil, errors.Wrap(err, "create KV store client") + } + } + + return createMultitenantAlertmanager(cfg, fallbackConfig, peer, store, ringStore, logger, registerer) } -func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackConfig []byte, peer *cluster.Peer, store AlertStore, logger log.Logger, registerer prometheus.Registerer) *MultitenantAlertmanager { +func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackConfig []byte, peer *cluster.Peer, store AlertStore, ringStore kv.Client, logger log.Logger, registerer prometheus.Registerer) (*MultitenantAlertmanager, error) { am := &MultitenantAlertmanager{ cfg: cfg, fallbackConfig: string(fallbackConfig), @@ -301,29 +341,176 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC peer: peer, store: store, logger: log.With(logger, "component", "MultiTenantAlertmanager"), + registry: registerer, + ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_alertmanager_ring_check_errors_total", + Help: "Number of errors that have occurred when checking the ring for ownership.", + }), + syncTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_alertmanager_sync_configs_total", + Help: "Total number of times the alertmanager sync operation triggered.", + }, []string{"reason"}), + syncFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_alertmanager_sync_configs_failed_total", + Help: "Total number of times the alertmanager sync operation failed.", + }, []string{"reason"}), + tenantsDiscovered: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_alertmanager_tenants_discovered", + Help: "Number of tenants with an Alertmanager configuration discovered.", + }), + tenantsOwned: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_alertmanager_tenants_owned", + Help: "Current number of tenants owned by the Alertmanager instance.", + }), + } + + // Initialize the top-level metrics. + for _, r := range []string{reasonInitial, reasonPeriodic, reasonRingChange} { + am.syncTotal.WithLabelValues(r) + am.syncFailures.WithLabelValues(r) + } + + if cfg.ShardingEnabled { + lifecyclerCfg, err := am.cfg.ShardingRing.ToLifecyclerConfig() + if err != nil { + return nil, errors.Wrap(err, "failed to initialize Alertmanager's lifecycler config") + } + + // Define lifecycler delegates in reverse order (last to be called defined first because they're + // chained via "next delegate"). + delegate := ring.BasicLifecyclerDelegate(am) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, am.logger) + delegate = ring.NewAutoForgetDelegate(am.cfg.ShardingRing.HeartbeatTimeout*ringAutoForgetUnhealthyPeriods, delegate, am.logger) + + am.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, am.logger, am.registry) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize Alertmanager's lifecycler") + } + + am.ring, err = ring.NewWithStoreClientAndStrategy(am.cfg.ShardingRing.ToRingConfig(), RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize Alertmanager's ring") + } + + if am.registry != nil { + am.registry.MustRegister(am.ring) + } } if registerer != nil { registerer.MustRegister(am.alertmanagerMetrics) } - am.Service = services.NewTimerService(am.cfg.PollInterval, am.starting, am.iteration, am.stopping) - return am + am.Service = services.NewBasicService(am.starting, am.run, am.stopping) + + return am, nil } -func (am *MultitenantAlertmanager) starting(_ context.Context) error { - // Load initial set of all configurations before polling for new ones. - am.syncConfigs(am.loadAllConfigs()) +func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { + defer func() { + if err == nil || am.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), am.subservices); stopErr != nil { + level.Error(am.logger).Log("msg", "failed to gracefully stop alertmanager dependencies", "err", stopErr) + } + }() + + if am.cfg.ShardingEnabled { + if am.subservices, err = services.NewManager(am.ringLifecycler, am.ring); err != nil { + return errors.Wrap(err, "failed to start alertmanager's subservices") + } + + if err = services.StartManagerAndAwaitHealthy(ctx, am.subservices); err != nil { + return errors.Wrap(err, "failed to start alertmanager's subservices") + } + + am.subservicesWatcher = services.NewFailureWatcher() + am.subservicesWatcher.WatchManager(am.subservices) + + // We wait until the instance is in the JOINING state, once it does we know that tokens are assigned to this instance and we'll be ready to perform an initial sync of configs. + level.Info(am.logger).Log("waiting until alertmanager is JOINING in the ring") + if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(am.logger).Log("msg", "alertmanager is JOINING in the ring") + } + + // At this point, if sharding is enabled, the instance is registered with some tokens + // and we can run the initial iteration to sync configs. If no sharding is enabled we load _all_ the configs. + if err := am.loadAndSyncConfigs(ctx, reasonInitial); err != nil { + return err + } + + if am.cfg.ShardingEnabled { + // With the initial sync now completed, we should have loaded all assigned alertmanager configurations to this instance. We can switch it to ACTIVE and start serving requests. + if err := am.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state. + level.Info(am.logger).Log("msg", "waiting until alertmanager is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(am.logger).Log("msg", "alertmanager is ACTIVE in the ring") + } + return nil } -func (am *MultitenantAlertmanager) iteration(_ context.Context) error { - err := am.updateConfigs() +func (am *MultitenantAlertmanager) run(ctx context.Context) error { + tick := time.NewTicker(am.cfg.PollInterval) + defer tick.Stop() + + var ringTickerChan <-chan time.Time + var ringLastState ring.ReplicationSet + + if am.cfg.ShardingEnabled { + ringLastState, _ = am.ring.GetAllHealthy(RingOp) + ringTicker := time.NewTicker(util.DurationWithJitter(am.cfg.ShardingRing.RingCheckPeriod, 0.2)) + defer ringTicker.Stop() + ringTickerChan = ringTicker.C + } + + for { + select { + case <-ctx.Done(): + return nil + case err := <-am.subservicesWatcher.Chan(): + return errors.Wrap(err, "alertmanager subservices failed") + case <-tick.C: + // We don't want to halt execution here but instead just log what happened. + if err := am.loadAndSyncConfigs(ctx, reasonPeriodic); err != nil { + level.Warn(am.logger).Log("msg", "error while synchronizing alertmanager configs", "err", err) + } + case <-ringTickerChan: + // We ignore the error because in case of error it will return an empty + // replication set which we use to compare with the previous state. + currRingState, _ := am.ring.GetAllHealthy(RingOp) + + if ring.HasReplicationSetChanged(ringLastState, currRingState) { + ringLastState = currRingState + if err := am.loadAndSyncConfigs(ctx, reasonRingChange); err != nil { + level.Warn(am.logger).Log("msg", "error while synchronizing alertmanager configs", "err", err) + } + } + } + } +} + +func (am *MultitenantAlertmanager) loadAndSyncConfigs(ctx context.Context, syncReason string) error { + level.Info(am.logger).Log("msg", "synchronizing alertmanager configs for users") + am.syncTotal.WithLabelValues(syncReason).Inc() + + cfgs, err := am.loadAlertmanagerConfigs(ctx) if err != nil { - level.Warn(am.logger).Log("msg", "error updating configs", "err", err) + am.syncFailures.WithLabelValues(syncReason).Inc() + return err } - // Returning error here would stop "MultitenantAlertmanager" service completely, - // so we return nil to keep service running. + + am.syncConfigs(cfgs) return nil } @@ -340,41 +527,61 @@ func (am *MultitenantAlertmanager) stopping(_ error) error { level.Warn(am.logger).Log("msg", "failed to leave the cluster", "err", err) } } - level.Debug(am.logger).Log("msg", "stopping") - return nil -} -// Load the full set of configurations from the alert store, retrying with backoff -// until we can get them. -func (am *MultitenantAlertmanager) loadAllConfigs() map[string]alerts.AlertConfigDesc { - backoff := util.NewBackoff(context.Background(), backoffConfig) - for { - cfgs, err := am.poll() - if err == nil { - level.Debug(am.logger).Log("msg", "initial configuration load", "num_configs", len(cfgs)) - return cfgs - } - level.Warn(am.logger).Log("msg", "error fetching all configurations, backing off", "err", err) - backoff.Wait() + if am.subservices != nil { + // subservices manages ring and lifecycler, if sharding was enabled. + _ = services.StopManagerAndAwaitStopped(context.Background(), am.subservices) } + return nil } -func (am *MultitenantAlertmanager) updateConfigs() error { - cfgs, err := am.poll() +// loadAlertmanagerConfigs Loads (and filters) the alertmanagers configuration from object storage, taking into consideration the sharding strategy. +func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) { + configs, err := am.store.ListAlertConfigs(ctx) if err != nil { - return err + return nil, err } - am.syncConfigs(cfgs) - return nil + + // Without any sharding, we return _all_ the configs and there's nothing else for us to do. + if !am.cfg.ShardingEnabled { + am.tenantsDiscovered.Set(float64(len(configs))) + am.tenantsOwned.Set(float64(len(configs))) + return configs, nil + } + + ownedConfigs := map[string]alerts.AlertConfigDesc{} + for userID, cfg := range configs { + owned, err := am.isConfigOwned(userID) + if err != nil { + am.ringCheckErrors.Inc() + level.Error(am.logger).Log("msg", "failed to load alertmanager configuration for user", "user", userID, "err", err) + continue + } + + if owned { + level.Debug(am.logger).Log("msg", "alertmanager configuration owned", "user", userID) + ownedConfigs[userID] = cfg + } else { + level.Debug(am.logger).Log("msg", "alertmanager configuration not owned, ignoring", "user", userID) + } + } + + am.tenantsDiscovered.Set(float64(len(configs))) + am.tenantsOwned.Set(float64(len(ownedConfigs))) + return ownedConfigs, nil } -// poll the alert store. Not re-entrant. -func (am *MultitenantAlertmanager) poll() (map[string]alerts.AlertConfigDesc, error) { - cfgs, err := am.store.ListAlertConfigs(context.Background()) +func (am *MultitenantAlertmanager) isConfigOwned(userID string) (bool, error) { + ringHasher := fnv.New32a() + // Hasher never returns err. + _, _ = ringHasher.Write([]byte(userID)) + + alertmanagers, err := am.ring.Get(ringHasher.Sum32(), RingOp, nil, nil, nil) if err != nil { - return nil, err + return false, errors.Wrap(err, "error reading ring to verify config ownership") } - return cfgs, nil + + return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()), nil } func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alerts.AlertConfigDesc) { diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index 451f32ba9ff..c55b5105b3f 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -21,8 +21,12 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/alertmanager/alerts" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" ) var ( @@ -41,26 +45,63 @@ receivers: // basic easily configurable mock type mockAlertStore struct { - configs map[string]alerts.AlertConfigDesc + configs map[string]alerts.AlertConfigDesc + OnList func() + WithListErr error } -func (m *mockAlertStore) ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfigDesc, error) { +func (m *mockAlertStore) ListAlertConfigs(_ context.Context) (map[string]alerts.AlertConfigDesc, error) { + if m.OnList != nil { + m.OnList() + } + + if m.WithListErr != nil { + return nil, m.WithListErr + } + return m.configs, nil } -func (m *mockAlertStore) GetAlertConfig(ctx context.Context, user string) (alerts.AlertConfigDesc, error) { +func (m *mockAlertStore) GetAlertConfig(_ context.Context, _ string) (alerts.AlertConfigDesc, error) { return alerts.AlertConfigDesc{}, fmt.Errorf("not implemented") } -func (m *mockAlertStore) SetAlertConfig(ctx context.Context, cfg alerts.AlertConfigDesc) error { +func (m *mockAlertStore) SetAlertConfig(_ context.Context, cfg alerts.AlertConfigDesc) error { m.configs[cfg.User] = cfg return nil } -func (m *mockAlertStore) DeleteAlertConfig(ctx context.Context, user string) error { +func (m *mockAlertStore) DeleteAlertConfig(_ context.Context, _ string) error { return fmt.Errorf("not implemented") } +func mockAlertmanagerConfig(t *testing.T) *MultitenantAlertmanagerConfig { + t.Helper() + + externalURL := flagext.URLValue{} + err := externalURL.Set("http://localhost/api/prom") + require.NoError(t, err) + + tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") + require.NoError(t, err) + + t.Cleanup(func() { + err := os.RemoveAll(tempDir) + require.NoError(t, err) + }) + + cfg := &MultitenantAlertmanagerConfig{} + flagext.DefaultValues(cfg) + + cfg.ExternalURL = externalURL + cfg.DataDir = tempDir + cfg.ShardingRing.InstanceID = "test" + cfg.ShardingRing.InstanceAddr = "127.0.0.1" + cfg.PollInterval = time.Minute + + return cfg +} + func TestLoadAllConfigs(t *testing.T) { mockStore := &mockAlertStore{ configs: map[string]alerts.AlertConfigDesc{ @@ -77,22 +118,14 @@ func TestLoadAllConfigs(t *testing.T) { }, } - externalURL := flagext.URLValue{} - err := externalURL.Set("http://localhost/api/prom") - require.NoError(t, err) - - tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") - require.NoError(t, err) - defer os.RemoveAll(tempDir) - reg := prometheus.NewPedanticRegistry() - am := createMultitenantAlertmanager(&MultitenantAlertmanagerConfig{ - ExternalURL: externalURL, - DataDir: tempDir, - }, nil, nil, mockStore, log.NewNopLogger(), reg) + cfg := mockAlertmanagerConfig(t) + am, err := createMultitenantAlertmanager(cfg, nil, nil, mockStore, nil, log.NewNopLogger(), reg) + require.NoError(t, err) // Ensure the configs are synced correctly - require.NoError(t, am.updateConfigs()) + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) require.Len(t, am.alertmanagers, 2) currentConfig, exists := am.cfgs["user1"] @@ -113,7 +146,8 @@ func TestLoadAllConfigs(t *testing.T) { Templates: []*alerts.TemplateDesc{}, } - require.NoError(t, am.updateConfigs()) + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) require.Len(t, am.alertmanagers, 3) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` @@ -131,16 +165,18 @@ func TestLoadAllConfigs(t *testing.T) { Templates: []*alerts.TemplateDesc{}, } - require.NoError(t, am.updateConfigs()) + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) currentConfig, exists = am.cfgs["user1"] require.True(t, exists) require.Equal(t, simpleConfigTwo, currentConfig.RawConfig) - // Test Delete User, ensure config is remove but alertmananger + // Test Delete User, ensure config is removed but alertmanager // exists and is set to inactive delete(mockStore.configs, "user3") - require.NoError(t, am.updateConfigs()) + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) currentConfig, exists = am.cfgs["user3"] require.False(t, exists) require.Equal(t, "", currentConfig.RawConfig) @@ -163,7 +199,8 @@ func TestLoadAllConfigs(t *testing.T) { Templates: []*alerts.TemplateDesc{}, } - require.NoError(t, am.updateConfigs()) + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) currentConfig, exists = am.cfgs["user3"] require.True(t, exists) @@ -183,20 +220,18 @@ func TestLoadAllConfigs(t *testing.T) { } func TestAlertmanager_NoExternalURL(t *testing.T) { - tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") - require.NoError(t, err) - defer os.RemoveAll(tempDir) + amConfig := mockAlertmanagerConfig(t) + amConfig.ExternalURL = flagext.URLValue{} // no external URL // Create the Multitenant Alertmanager. reg := prometheus.NewPedanticRegistry() - _, err = NewMultitenantAlertmanager(&MultitenantAlertmanagerConfig{ - DataDir: tempDir, - }, log.NewNopLogger(), reg) + _, err := NewMultitenantAlertmanager(amConfig, log.NewNopLogger(), reg) require.EqualError(t, err, "unable to create Alertmanager because the external URL has not been configured") } func TestAlertmanager_ServeHTTP(t *testing.T) { + amConfig := mockAlertmanagerConfig(t) mockStore := &mockAlertStore{ configs: map[string]alerts.AlertConfigDesc{}, } @@ -205,17 +240,12 @@ func TestAlertmanager_ServeHTTP(t *testing.T) { err := externalURL.Set("http://localhost:8080/alertmanager") require.NoError(t, err) - tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") - require.NoError(t, err) - defer os.RemoveAll(tempDir) + amConfig.ExternalURL = externalURL // Create the Multitenant Alertmanager. reg := prometheus.NewPedanticRegistry() - am := createMultitenantAlertmanager(&MultitenantAlertmanagerConfig{ - ExternalURL: externalURL, - DataDir: tempDir, - PollInterval: time.Minute, - }, nil, nil, mockStore, log.NewNopLogger(), reg) + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, nil, log.NewNopLogger(), reg) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), am)) defer services.StopAndAwaitTerminated(context.Background(), am) //nolint:errcheck @@ -242,7 +272,7 @@ func TestAlertmanager_ServeHTTP(t *testing.T) { } // Make the alertmanager pick it up, then pause it. - err = am.updateConfigs() + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) require.NoError(t, err) // Request when AM is active. @@ -301,6 +331,7 @@ func verify404(ctx context.Context, t *testing.T, am *MultitenantAlertmanager, m } func TestAlertmanager_ServeHTTPWithFallbackConfig(t *testing.T) { + amConfig := mockAlertmanagerConfig(t) mockStore := &mockAlertStore{ configs: map[string]alerts.AlertConfigDesc{}, } @@ -309,10 +340,6 @@ func TestAlertmanager_ServeHTTPWithFallbackConfig(t *testing.T) { err := externalURL.Set("http://localhost:8080/alertmanager") require.NoError(t, err) - tempDir, err := ioutil.TempDir(os.TempDir(), "alertmanager") - require.NoError(t, err) - defer os.RemoveAll(tempDir) - fallbackCfg := ` global: smtp_smarthost: 'localhost:25' @@ -324,13 +351,11 @@ receivers: email_configs: - to: 'youraddress@example.org' ` + amConfig.ExternalURL = externalURL // Create the Multitenant Alertmanager. - am := createMultitenantAlertmanager(&MultitenantAlertmanagerConfig{ - ExternalURL: externalURL, - DataDir: tempDir, - PollInterval: time.Minute, - }, nil, nil, mockStore, log.NewNopLogger(), nil) + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, nil, log.NewNopLogger(), nil) + require.NoError(t, err) am.fallbackConfig = fallbackCfg require.NoError(t, services.StartAndAwaitRunning(context.Background(), am)) @@ -351,7 +376,7 @@ receivers: require.True(t, am.alertmanagers["user1"].IsActive()) // Even after a poll it does not pause your Alertmanager - err = am.updateConfigs() + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) require.NoError(t, err) require.True(t, am.alertmanagers["user1"].IsActive()) @@ -368,3 +393,444 @@ receivers: body, _ := ioutil.ReadAll(resp.Body) require.Equal(t, "the Alertmanager is not configured\n", string(body)) } + +func TestAlertmanager_InitialSyncWithSharding(t *testing.T) { + tc := []struct { + name string + existing bool + initialState ring.IngesterState + initialTokens ring.Tokens + }{ + { + name: "with no instance in the ring", + existing: false, + }, + { + name: "with an instance already in the ring with PENDING state and no tokens", + existing: true, + initialState: ring.PENDING, + initialTokens: ring.Tokens{}, + }, + { + name: "with an instance already in the ring with JOINING state and some tokens", + existing: true, + initialState: ring.JOINING, + initialTokens: ring.Tokens{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "with an instance already in the ring with ACTIVE state and all tokens", + existing: true, + initialState: ring.ACTIVE, + initialTokens: ring.GenerateTokens(128, nil), + }, + { + name: "with an instance already in the ring with LEAVING state and all tokens", + existing: true, + initialState: ring.LEAVING, + initialTokens: ring.Tokens{100000}, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{}, + } + + // Setup the initial instance state in the ring. + if tt.existing { + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + ringDesc.AddIngester(amConfig.ShardingRing.InstanceID, amConfig.ShardingRing.InstanceAddr, "", tt.initialTokens, tt.initialState, time.Now()) + return ringDesc, true, nil + })) + } + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck + + // Before being registered in the ring. + require.False(t, am.ringLifecycler.IsRegistered()) + require.Equal(t, ring.PENDING.String(), am.ringLifecycler.GetState().String()) + require.Equal(t, 0, len(am.ringLifecycler.GetTokens())) + require.Equal(t, ring.Tokens{}, am.ringLifecycler.GetTokens()) + + // During the initial sync, we expect two things. That the instance is already + // registered with the ring (meaning we have tokens) and that its state is JOINING. + mockStore.OnList = func() { + require.True(t, am.ringLifecycler.IsRegistered()) + require.Equal(t, ring.JOINING.String(), am.ringLifecycler.GetState().String()) + } + + // Once successfully started, the instance should be ACTIVE in the ring. + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + + // After being registered in the ring. + require.True(t, am.ringLifecycler.IsRegistered()) + require.Equal(t, ring.ACTIVE.String(), am.ringLifecycler.GetState().String()) + require.Equal(t, 128, len(am.ringLifecycler.GetTokens())) + require.Subset(t, am.ringLifecycler.GetTokens(), tt.initialTokens) + }) + } +} + +func TestAlertmanager_PerTenantSharding(t *testing.T) { + tc := []struct { + name string + tenantShardSize int + replicationFactor int + instances int + configs int + expectedTenants int + withSharding bool + }{ + { + name: "sharding disabled, 1 instance", + instances: 1, + configs: 10, + expectedTenants: 10, + }, + { + name: "sharding disabled, 2 instances", + instances: 2, + configs: 10, + expectedTenants: 10 * 2, // each instance loads _all_ tenants. + }, + { + name: "sharding enabled, 1 instance, RF = 1", + withSharding: true, + instances: 1, + replicationFactor: 1, + configs: 10, + expectedTenants: 10, // same as no sharding and 1 instance + }, + { + name: "sharding enabled, 2 instances, RF = 1", + withSharding: true, + instances: 2, + replicationFactor: 1, + configs: 10, + expectedTenants: 10, // configs * replication factor + }, + { + name: "sharding enabled, 3 instances, RF = 2", + withSharding: true, + instances: 3, + replicationFactor: 2, + configs: 10, + expectedTenants: 20, // configs * replication factor + }, + { + name: "sharding enabled, 5 instances, RF = 3", + withSharding: true, + instances: 5, + replicationFactor: 3, + configs: 10, + expectedTenants: 30, // configs * replication factor + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{}, + } + + var instances []*MultitenantAlertmanager + var instanceIDs []string + registries := util.NewUserRegistries() + + // First, add the number of configs to the store. + for i := 1; i <= tt.configs; i++ { + u := fmt.Sprintf("u-%d", i) + mockStore.configs[u] = alerts.AlertConfigDesc{ + User: u, + RawConfig: simpleConfigOne, + Templates: []*alerts.TemplateDesc{}, + } + } + + // Then, create the alertmanager instances, start them and add their registries to the slice. + for i := 1; i <= tt.instances; i++ { + instanceIDs = append(instanceIDs, fmt.Sprintf("alertmanager-%d", i)) + instanceID := fmt.Sprintf("alertmanager-%d", i) + + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingRing.ReplicationFactor = tt.replicationFactor + amConfig.ShardingRing.InstanceID = instanceID + amConfig.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + // Do not check the ring topology changes or poll in an interval in this test (we explicitly sync alertmanagers). + amConfig.PollInterval = time.Hour + amConfig.ShardingRing.RingCheckPeriod = time.Hour + + if tt.withSharding { + amConfig.ShardingEnabled = true + } + + reg := prometheus.NewPedanticRegistry() + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), reg) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck + + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + + instances = append(instances, am) + instanceIDs = append(instanceIDs, instanceID) + registries.AddUserRegistry(instanceID, reg) + } + + // If we're testing sharding, we need make sure the ring is settled. + if tt.withSharding { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. + for _, am := range instances { + for _, id := range instanceIDs { + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + } + } + } + + // Now that the ring has settled, sync configs with the instances. + var numConfigs, numInstances int + for _, am := range instances { + err := am.loadAndSyncConfigs(ctx, reasonRingChange) + require.NoError(t, err) + numConfigs += len(am.cfgs) + numInstances += len(am.alertmanagers) + } + + metrics := registries.BuildMetricFamiliesPerUser() + assert.Equal(t, tt.expectedTenants, numConfigs) + assert.Equal(t, tt.expectedTenants, numInstances) + assert.Equal(t, float64(tt.expectedTenants), metrics.GetSumOfGauges("cortex_alertmanager_tenants_owned")) + assert.Equal(t, float64(tt.configs*tt.instances), metrics.GetSumOfGauges("cortex_alertmanager_tenants_discovered")) + }) + } +} + +func TestAlertmanager_SyncOnRingTopologyChanges(t *testing.T) { + registeredAt := time.Now() + + tc := []struct { + name string + setupRing func(desc *ring.Desc) + updateRing func(desc *ring.Desc) + expected bool + }{ + { + name: "when an instance is added to the ring", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + }, + updateRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + }, + expected: true, + }, + { + name: "when an instance is removed from the ring", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + }, + updateRing: func(desc *ring.Desc) { + desc.RemoveIngester("alertmanager-1") + }, + expected: true, + }, + { + name: "should sync when an instance changes state", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.JOINING, registeredAt) + }, + updateRing: func(desc *ring.Desc) { + instance := desc.Ingesters["alertmanager-2"] + instance.State = ring.ACTIVE + desc.Ingesters["alertmanager-2"] = instance + }, + expected: true, + }, + { + name: "should sync when an healthy instance becomes unhealthy", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + }, + updateRing: func(desc *ring.Desc) { + instance := desc.Ingesters["alertmanager-1"] + instance.Timestamp = time.Now().Add(-time.Hour).Unix() + desc.Ingesters["alertmanager-1"] = instance + }, + expected: true, + }, + { + name: "should sync when an unhealthy instance becomes healthy", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + + instance := desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + instance.Timestamp = time.Now().Add(-time.Hour).Unix() + desc.Ingesters["alertmanager-2"] = instance + }, + updateRing: func(desc *ring.Desc) { + instance := desc.Ingesters["alertmanager-2"] + instance.Timestamp = time.Now().Unix() + desc.Ingesters["alertmanager-2"] = instance + }, + expected: true, + }, + { + name: "should NOT sync when an instance updates the heartbeat", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + }, + updateRing: func(desc *ring.Desc) { + instance := desc.Ingesters["alertmanager-1"] + instance.Timestamp = time.Now().Add(time.Second).Unix() + desc.Ingesters["alertmanager-1"] = instance + }, + expected: false, + }, + { + name: "should NOT sync when an instance is auto-forgotten in the ring but was already unhealthy in the previous state", + setupRing: func(desc *ring.Desc) { + desc.AddIngester("alertmanager-1", "127.0.0.1", "", ring.Tokens{1, 2, 3}, ring.ACTIVE, registeredAt) + desc.AddIngester("alertmanager-2", "127.0.0.2", "", ring.Tokens{4, 5, 6}, ring.ACTIVE, registeredAt) + + instance := desc.Ingesters["alertmanager-2"] + instance.Timestamp = time.Now().Add(-time.Hour).Unix() + desc.Ingesters["alertmanager-2"] = instance + }, + updateRing: func(desc *ring.Desc) { + desc.RemoveIngester("alertmanager-2") + }, + expected: false, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + amConfig.ShardingRing.RingCheckPeriod = 100 * time.Millisecond + amConfig.PollInterval = time.Hour // Don't trigger the periodic check. + + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{}, + } + + reg := prometheus.NewPedanticRegistry() + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), reg) + require.NoError(t, err) + + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + tt.setupRing(ringDesc) + return ringDesc, true, nil + })) + + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck + + // Make sure the initial sync happened. + regs := util.NewUserRegistries() + regs.AddUserRegistry("test", reg) + metrics := regs.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_sync_configs_total")) + + // Change the ring topology. + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + tt.updateRing(ringDesc) + return ringDesc, true, nil + })) + + // Assert if we expected a sync or not. + if tt.expected { + test.Poll(t, time.Second, float64(2), func() interface{} { + metrics := regs.BuildMetricFamiliesPerUser() + return metrics.GetSumOfCounters("cortex_alertmanager_sync_configs_total") + }) + } else { + time.Sleep(250 * time.Millisecond) + + metrics := regs.BuildMetricFamiliesPerUser() + assert.Equal(t, float64(1), metrics.GetSumOfCounters("cortex_alertmanager_sync_configs_total")) + } + }) + } +} + +func TestAlertmanager_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { + const unhealthyInstanceID = "alertmanager-bad-1" + const heartbeatTimeout = time.Minute + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + amConfig.ShardingRing.HeartbeatPeriod = 100 * time.Millisecond + amConfig.ShardingRing.HeartbeatTimeout = heartbeatTimeout + + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{}, + } + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, am)) + defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck + + require.NoError(t, ringStore.CAS(ctx, RingKey, func(in interface{}) (interface{}, bool, error) { + ringDesc := ring.GetOrCreateRingDesc(in) + instance := ringDesc.AddIngester(unhealthyInstanceID, "127.0.0.1", "", ring.GenerateTokens(RingNumTokens, nil), ring.ACTIVE, time.Now()) + instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix() + ringDesc.Ingesters[unhealthyInstanceID] = instance + + return ringDesc, true, nil + })) + + test.Poll(t, time.Second, false, func() interface{} { + d, err := ringStore.Get(ctx, RingKey) + if err != nil { + return err + } + + _, ok := ring.GetOrCreateRingDesc(d).Ingesters[unhealthyInstanceID] + return ok + }) +} + +func TestAlertmanager_InitialSyncFailureWithSharding(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + ringStore := consul.NewInMemoryClient(ring.GetCodec()) + mockStore := &mockAlertStore{ + configs: map[string]alerts.AlertConfigDesc{}, + WithListErr: fmt.Errorf("a fetch list failure"), + } + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, mockStore, ringStore, log.NewNopLogger(), nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, am) //nolint:errcheck + + require.NoError(t, am.StartAsync(ctx)) + err = am.AwaitRunning(ctx) + require.Error(t, err) + require.Equal(t, services.Failed, am.State()) + require.False(t, am.ringLifecycler.IsRegistered()) + require.NotNil(t, am.ring) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 412eeff9722..83777390ee6 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -144,8 +144,10 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth // serve endpoints using the legacy http-prefix if it is not run as a single binary. func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, target, apiEnabled bool) { a.indexPage.AddLink(SectionAdminEndpoints, "/multitenant_alertmanager/status", "Alertmanager Status") + a.indexPage.AddLink(SectionAdminEndpoints, "/multitenant_alertmanager/ring", "Alertmanager Ring Status") // Ensure this route is registered before the prefixed AM route a.RegisterRoute("/multitenant_alertmanager/status", am.GetStatusHandler(), false, "GET") + a.RegisterRoute("/multitenant_alertmanager/ring", http.HandlerFunc(am.RingHandler), false, "GET", "POST") // UI components lead to a large number of routes to support, utilize a path prefix instead a.RegisterRoutesWithPrefix(a.cfg.AlertmanagerHTTPPrefix, am, true) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3a65d74f629..5a1749c8b14 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -671,6 +671,8 @@ func (t *Cortex) initConfig() (serv services.Service, err error) { } func (t *Cortex) initAlertManager() (serv services.Service, err error) { + t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.HTTPListenPort + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, util.Logger, prometheus.DefaultRegisterer) if err != nil { return @@ -728,6 +730,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Alertmanager.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV return t.MemberlistKV, nil } @@ -834,7 +837,7 @@ func (t *Cortex) setupModuleManager() error { TableManager: {API}, Ruler: {Overrides, DistributorService, Store, StoreQueryable, RulerStorage}, Configs: {API}, - AlertManager: {API}, + AlertManager: {API, MemberlistKV}, Compactor: {API, MemberlistKV}, StoreGateway: {API, Overrides, MemberlistKV}, ChunksPurger: {Store, DeleteRequestsStore, API}, diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index ead4db65169..fd56b8583c0 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -200,7 +200,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa return nil, errors.Wrap(err, "failed to create store-gateway ring backend") } - storesRing, err := ring.NewWithStoreClientAndStrategy(storesRingCfg, storegateway.RingNameForClient, storegateway.RingKey, storesRingBackend, &storegateway.BlocksReplicationStrategy{}) + storesRing, err := ring.NewWithStoreClientAndStrategy(storesRingCfg, storegateway.RingNameForClient, storegateway.RingKey, storesRingBackend, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return nil, errors.Wrap(err, "failed to create store-gateway ring client") } diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index d4f1cdc3d64..5d0606df6fd 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -17,7 +17,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" - "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -327,7 +326,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { flagext.DefaultValues(&ringCfg) ringCfg.ReplicationFactor = testData.replicationFactor - r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, &storegateway.BlocksReplicationStrategy{}) + r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) require.NoError(t, err) limits := &blocksStoreLimitsMock{ diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 1e372276a63..5156ba22d12 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -3,6 +3,8 @@ package ring import ( "fmt" "time" + + "github.com/pkg/errors" ) type ReplicationStrategy interface { @@ -63,6 +65,31 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati return ingesters, len(ingesters) - minSuccess, nil } +type ignoreUnhealthyInstancesReplicationStrategy struct{} + +func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { + return &ignoreUnhealthyInstancesReplicationStrategy{} +} + +func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []IngesterDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []IngesterDesc, maxFailures int, err error) { + now := time.Now() + // Filter out unhealthy instances. + for i := 0; i < len(instances); { + if instances[i].IsHealthy(op, heartbeatTimeout, now) { + i++ + } else { + instances = append(instances[:i], instances[i+1:]...) + } + } + + // We need at least 1 healthy instance no matter what is the replication factor set to. + if len(instances) == 0 { + return nil, 0, errors.New("at least 1 healthy replica required, could only find 0") + } + + return instances, len(instances) - 1, nil +} + func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool { return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index 3d47d1cec55..b1e1d49aa16 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -10,93 +10,155 @@ import ( func TestRingReplicationStrategy(t *testing.T) { for i, tc := range []struct { - RF, LiveIngesters, DeadIngesters int - ExpectedMaxFailure int - ExpectedError string + replicationFactor, liveIngesters, deadIngesters int + expectedMaxFailure int + expectedError string }{ // Ensure it works for a single ingester, for local testing. { - RF: 1, - LiveIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 1, + liveIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 1, - DeadIngesters: 1, - ExpectedError: "at least 1 live replicas required, could only find 0", + replicationFactor: 1, + deadIngesters: 1, + expectedError: "at least 1 live replicas required, could only find 0", }, // Ensure it works for RF=3 and 2 ingesters. { - RF: 3, - LiveIngesters: 2, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 2, + expectedMaxFailure: 0, }, // Ensure it works for the default production config. { - RF: 3, - LiveIngesters: 3, - ExpectedMaxFailure: 1, + replicationFactor: 3, + liveIngesters: 3, + expectedMaxFailure: 1, }, { - RF: 3, - LiveIngesters: 2, - DeadIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 2, + deadIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 3, - LiveIngesters: 1, - DeadIngesters: 2, - ExpectedError: "at least 2 live replicas required, could only find 1", + replicationFactor: 3, + liveIngesters: 1, + deadIngesters: 2, + expectedError: "at least 2 live replicas required, could only find 1", }, // Ensure it works when adding / removing nodes. // A node is joining or leaving, replica set expands. { - RF: 3, - LiveIngesters: 4, - ExpectedMaxFailure: 1, + replicationFactor: 3, + liveIngesters: 4, + expectedMaxFailure: 1, }, { - RF: 3, - LiveIngesters: 3, - DeadIngesters: 1, - ExpectedMaxFailure: 0, + replicationFactor: 3, + liveIngesters: 3, + deadIngesters: 1, + expectedMaxFailure: 0, }, { - RF: 3, - LiveIngesters: 2, - DeadIngesters: 2, - ExpectedError: "at least 3 live replicas required, could only find 2", + replicationFactor: 3, + liveIngesters: 2, + deadIngesters: 2, + expectedError: "at least 3 live replicas required, could only find 2", }, } { ingesters := []IngesterDesc{} - for i := 0; i < tc.LiveIngesters; i++ { + for i := 0; i < tc.liveIngesters; i++ { ingesters = append(ingesters, IngesterDesc{ Timestamp: time.Now().Unix(), }) } - for i := 0; i < tc.DeadIngesters; i++ { + for i := 0; i < tc.deadIngesters; i++ { ingesters = append(ingesters, IngesterDesc{}) } t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { strategy := NewDefaultReplicationStrategy() - liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.RF, 100*time.Second, false) - if tc.ExpectedError == "" { + liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false) + if tc.expectedError == "" { assert.NoError(t, err) - assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) - assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) + assert.Equal(t, tc.liveIngesters, len(liveIngesters)) + assert.Equal(t, tc.expectedMaxFailure, maxFailure) } else { - assert.EqualError(t, err, tc.ExpectedError) + assert.EqualError(t, err, tc.expectedError) + } + }) + } +} + +func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) { + for _, tc := range []struct { + name string + liveIngesters, deadIngesters int + expectedMaxFailure int + expectedError string + }{ + { + name: "with at least 1 healthy instance", + liveIngesters: 1, + expectedMaxFailure: 0, + }, + { + name: "with more healthy instances than unhealthy", + deadIngesters: 1, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with more unhealthy instances than healthy", + deadIngesters: 1, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with equal number of healthy and unhealthy instances", + deadIngesters: 2, + liveIngesters: 2, + expectedMaxFailure: 1, + }, + { + name: "with no healthy instances", + liveIngesters: 0, + deadIngesters: 3, + expectedMaxFailure: 0, + expectedError: "at least 1 healthy replica required, could only find 0", + }, + } { + ingesters := []IngesterDesc{} + for i := 0; i < tc.liveIngesters; i++ { + ingesters = append(ingesters, IngesterDesc{ + Timestamp: time.Now().Unix(), + }) + } + for i := 0; i < tc.deadIngesters; i++ { + ingesters = append(ingesters, IngesterDesc{}) + } + + t.Run(tc.name, func(t *testing.T) { + strategy := NewIgnoreUnhealthyInstancesReplicationStrategy() + liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false) + if tc.expectedError == "" { + assert.NoError(t, err) + assert.Equal(t, tc.liveIngesters, len(liveIngesters)) + assert.Equal(t, tc.expectedMaxFailure, maxFailure) + } else { + assert.EqualError(t, err, tc.expectedError) } }) } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ac2105cbf66..f4b93fb5211 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -280,7 +280,7 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { return errors.Wrap(err, "failed to initialize ruler's lifecycler") } - r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ring.RulerRingKey, ringStore, rulerReplicationStrategy{}) + r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ring.RulerRingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return errors.Wrap(err, "failed to initialize ruler's ring") } diff --git a/pkg/ruler/ruler_replication_strategy.go b/pkg/ruler/ruler_replication_strategy.go deleted file mode 100644 index fe98d8504ff..00000000000 --- a/pkg/ruler/ruler_replication_strategy.go +++ /dev/null @@ -1,36 +0,0 @@ -package ruler - -import ( - "time" - - "github.com/pkg/errors" - - "github.com/cortexproject/cortex/pkg/ring" -) - -// RingOp is the operation used for distributing rule groups between rulers. -var RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, func(s ring.IngesterState) bool { - // Only ACTIVE rulers get any rule groups. If instance is not ACTIVE, we need to find another ruler. - return s != ring.ACTIVE -}) - -type rulerReplicationStrategy struct{} - -func (r rulerReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []ring.IngesterDesc, maxFailures int, err error) { - now := time.Now() - - // Filter out unhealthy instances. - for i := 0; i < len(instances); { - if instances[i].IsHealthy(op, heartbeatTimeout, now) { - i++ - } else { - instances = append(instances[:i], instances[i+1:]...) - } - } - - if len(instances) == 0 { - return nil, 0, errors.New("no healthy ruler instance found for the replication set") - } - - return instances, len(instances) - 1, nil -} diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 33881bd8bbe..3bd18e9a247 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -21,6 +21,12 @@ const ( ringAutoForgetUnhealthyPeriods = 2 ) +// RingOp is the operation used for distributing rule groups between rulers. +var RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, func(s ring.IngesterState) bool { + // Only ACTIVE rulers get any rule groups. If instance is not ACTIVE, we need to find another ruler. + return s != ring.ACTIVE +}) + // RingConfig masks the ring lifecycler config which contains // many options not really required by the rulers ring. This config // is used to strip down the config to the minimum, and avoid confusion diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 3ffd535aa01..182f5cefb69 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -163,7 +163,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf } ringCfg := gatewayCfg.ShardingRing.ToRingConfig() - g.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, &BlocksReplicationStrategy{}) + g.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return nil, errors.Wrap(err, "create ring client") } diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 7d1d24e6df2..e977c84ca76 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -30,6 +30,20 @@ const ( RingNumTokens = 512 ) +var ( + // BlocksSync is the operation run by the store-gateway to sync blocks. + BlocksSync = ring.NewOp([]ring.IngesterState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, func(s ring.IngesterState) bool { + // If the instance is JOINING or LEAVING we should extend the replica set: + // - JOINING: the previous replica set should be kept while an instance is JOINING + // - LEAVING: the instance is going to be decommissioned soon so we need to include + // another replica in the set + return s == ring.JOINING || s == ring.LEAVING + }) + + // BlocksRead is the operation run by the querier to query blocks via the store-gateway. + BlocksRead = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, nil) +) + // RingConfig masks the ring lifecycler config which contains // many options not really required by the store gateways ring. This config // is used to strip down the config to the minimum, and avoid confusion diff --git a/pkg/storegateway/replication_strategy_test.go b/pkg/storegateway/gateway_ring_test.go similarity index 100% rename from pkg/storegateway/replication_strategy_test.go rename to pkg/storegateway/gateway_ring_test.go diff --git a/pkg/storegateway/replication_strategy.go b/pkg/storegateway/replication_strategy.go deleted file mode 100644 index a293df36141..00000000000 --- a/pkg/storegateway/replication_strategy.go +++ /dev/null @@ -1,46 +0,0 @@ -package storegateway - -import ( - "errors" - "time" - - "github.com/cortexproject/cortex/pkg/ring" -) - -var ( - // BlocksSync is the operation run by the store-gateway to sync blocks. - BlocksSync = ring.NewOp([]ring.IngesterState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, func(s ring.IngesterState) bool { - // If the instance is JOINING or LEAVING we should extend the replica set: - // - JOINING: the previous replica set should be kept while an instance is JOINING - // - LEAVING: the instance is going to be decommissioned soon so we need to include - // another replica in the set - return s == ring.JOINING || s == ring.LEAVING - }) - - // BlocksRead is the operation run by the querier to query blocks via the store-gateway. - BlocksRead = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, nil) -) - -type BlocksReplicationStrategy struct{} - -func (s *BlocksReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) ([]ring.IngesterDesc, int, error) { - now := time.Now() - - // Filter out unhealthy instances. - for i := 0; i < len(instances); { - if instances[i].IsHealthy(op, heartbeatTimeout, now) { - i++ - } else { - instances = append(instances[:i], instances[i+1:]...) - } - } - - // For the store-gateway use case we need that a block is loaded at least on - // 1 instance, no matter what is the replication factor set (no quorum logic). - if len(instances) == 0 { - return nil, 0, errors.New("no healthy store-gateway instance found for the replication set") - } - - maxFailures := len(instances) - 1 - return instances, maxFailures, nil -} diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index b462ff4f907..cfa09d17fae 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -257,7 +257,7 @@ func TestDefaultShardingStrategy(t *testing.T) { ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, } - r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{}) + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, r)) defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck @@ -614,7 +614,7 @@ func TestShuffleShardingStrategy(t *testing.T) { SubringCacheDisabled: true, } - r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{}) + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, r)) defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck