diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e5caea46a2..2f72089d9b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,12 +63,14 @@ * [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191 * [ENHANCEMENT] Experimental Ruler API: Fetch rule groups from object storage in parallel. #3218 * [ENHANCEMENT] Chunks GCS object storage client uses the `fields` selector to limit the payload size when listing objects in the bucket. #3218 +* [ENHANCEMENT] Added shuffle sharding support to ruler. Added new metric `cortex_ruler_sync_rules_total`. #3235 * [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 * [BUGFIX] Handle hash-collisions in the query path. #3192 * [BUGFIX] Check for postgres rows errors. #3197 * [BUGFIX] Ruler Experimental API: Don't allow rule groups without names or empty rule groups. #3210 * [BUGFIX] Experimental Alertmanager API: Do not allow empty Alertmanager configurations or bad template filenames to be submitted through the configuration API. #3185 +* [BUGFIX] When using ruler sharding, moving all user rule groups from ruler to a different one and then back could end up with some user groups not being evaluated at all. #3235 ## 1.4.0-rc.0 in progress diff --git a/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules b/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules deleted file mode 100644 index fa6bcd37a3b..00000000000 --- a/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules +++ /dev/null @@ -1,14 +0,0 @@ -{ - "since": 2, - "configs": { - "fake": { - "id": 1, - "config": { - "rule_format_version": "2", - "rules_files": { - "test": "groups:\n - name: group-1\n rules:\n - record: avalanche_metric_mmmmm_0_0:count\n expr: count(avalanche_metric_mmmmm_0_0)\n - name: group-2\n rules:\n - record: avalanche_metric_mmmmm_0:count\n expr: |\n count(avalanche_metric_mmmmm_0_0) +\n count(avalanche_metric_mmmmm_0_1)\n - alert: alert-1\n expr: |\n count(avalanche_metric_mmmmm_0_0) > 1000\n - alert: alert-2\n expr: count(avalanche_metric_mmmmm_0_0) > 100\n for: 5m\n labels:\n label1: value1\n label2: value2\n annotations:\n annotation1: value1\n annotation2: value2\n" - } - } - } - } -} diff --git a/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules.yaml b/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules.yaml deleted file mode 100644 index 3fd54ff17a1..00000000000 --- a/development/tsdb-blocks-storage-s3/.data-configstore/api/prom/configs/rules.yaml +++ /dev/null @@ -1,23 +0,0 @@ -groups: - - name: group-1 - rules: - - record: avalanche_metric_mmmmm_0_0:count - expr: count(avalanche_metric_mmmmm_0_0) - - name: group-2 - rules: - - record: avalanche_metric_mmmmm_0:count - expr: | - count(avalanche_metric_mmmmm_0_0) + - count(avalanche_metric_mmmmm_0_1) - - alert: alert-1 - expr: | - count(avalanche_metric_mmmmm_0_0) > 1000 - - alert: alert-2 - expr: count(avalanche_metric_mmmmm_0_0) > 100 - for: 5m - labels: - label1: value1 - label2: value2 - annotations: - annotation1: value1 - annotation2: value2 diff --git a/development/tsdb-blocks-storage-s3/.data-minio/.gitignore b/development/tsdb-blocks-storage-s3/.data-minio/.gitignore index f8872cf1422..521c201bb0a 100644 --- a/development/tsdb-blocks-storage-s3/.data-minio/.gitignore +++ b/development/tsdb-blocks-storage-s3/.data-minio/.gitignore @@ -2,3 +2,4 @@ !.gitignore !cortex-tsdb !cortex-alertmanager +!cortex-ruler diff --git a/development/tsdb-blocks-storage-s3/.data-minio/cortex-ruler/.gitignore b/development/tsdb-blocks-storage-s3/.data-minio/cortex-ruler/.gitignore new file mode 100644 index 00000000000..d6b7ef32c84 --- /dev/null +++ b/development/tsdb-blocks-storage-s3/.data-minio/cortex-ruler/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index 3a57de2cf38..cd1fc9117a1 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -70,9 +70,19 @@ blocks_storage: ruler: enable_api: true storage: - type: configdb - configdb: - configs_api_url: http://configstore:80/ + type: s3 + s3: + bucketnames: cortex-ruler + s3forcepathstyle: true + s3: http://cortex:supersecret@minio.:9000 + enable_sharding: true + ring: + heartbeat_period: 5s + heartbeat_timeout: 15s + kvstore: + store: consul + consul: + host: consul:8500 alertmanager: enable_api: true diff --git a/development/tsdb-blocks-storage-s3/config/grafana-agent.yaml b/development/tsdb-blocks-storage-s3/config/grafana-agent.yaml index 9b62fdf0550..245cc90d858 100644 --- a/development/tsdb-blocks-storage-s3/config/grafana-agent.yaml +++ b/development/tsdb-blocks-storage-s3/config/grafana-agent.yaml @@ -31,7 +31,7 @@ prometheus: namespace: 'tsdb-blocks-storage-s3' - job_name: tsdb-blocks-storage-s3/ruler static_configs: - - targets: ['ruler:8005'] + - targets: ['ruler-1:8021', 'ruler-2:8022'] labels: cluster: 'docker-compose' namespace: 'tsdb-blocks-storage-s3' diff --git a/development/tsdb-blocks-storage-s3/config/prometheus.yaml b/development/tsdb-blocks-storage-s3/config/prometheus.yaml index ffd1c68b1f1..ae916087aba 100644 --- a/development/tsdb-blocks-storage-s3/config/prometheus.yaml +++ b/development/tsdb-blocks-storage-s3/config/prometheus.yaml @@ -24,7 +24,7 @@ scrape_configs: namespace: 'tsdb-blocks-storage-s3' - job_name: tsdb-blocks-storage-s3/ruler static_configs: - - targets: ['ruler:8005'] + - targets: ['ruler-1:8021', 'ruler-2:8022'] labels: cluster: 'docker-compose' namespace: 'tsdb-blocks-storage-s3' diff --git a/development/tsdb-blocks-storage-s3/docker-compose.yml b/development/tsdb-blocks-storage-s3/docker-compose.yml index 9575fc4481a..d9313f6f883 100644 --- a/development/tsdb-blocks-storage-s3/docker-compose.yml +++ b/development/tsdb-blocks-storage-s3/docker-compose.yml @@ -21,11 +21,6 @@ services: memcached: image: memcached:1.6 - configstore: - image: nginx - volumes: - - .data-configstore:/usr/share/nginx/html/private:ro - prometheus: image: prom/prometheus:v2.16.0 command: ["--config.file=/etc/prometheus/prometheus.yaml"] @@ -178,80 +173,101 @@ services: volumes: - ./config:/cortex/config - ruler: + compactor: build: context: . dockerfile: dev.dockerfile image: cortex - command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18005 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=ruler -server.http-listen-port=8005 -server.grpc-listen-port=9005"] + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18006 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=compactor -server.http-listen-port=8006 -server.grpc-listen-port=9006"] depends_on: - consul - minio environment: - JAEGER_AGENT_HOST=jaeger - JAEGER_AGENT_PORT=6831 - - JAEGER_TAGS=app=ruler + - JAEGER_TAGS=app=compactor - JAEGER_SAMPLER_TYPE=const - JAEGER_SAMPLER_PARAM=1 ports: - - 8005:8005 - - 18005:18005 + - 8006:8006 + - 18006:18006 volumes: - ./config:/cortex/config - compactor: + query-frontend: build: context: . dockerfile: dev.dockerfile image: cortex - command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18006 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=compactor -server.http-listen-port=8006 -server.grpc-listen-port=9006"] + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -store.max-query-length=8760h"] depends_on: - consul - minio environment: - JAEGER_AGENT_HOST=jaeger - JAEGER_AGENT_PORT=6831 - - JAEGER_TAGS=app=compactor + - JAEGER_TAGS=app=query-frontend - JAEGER_SAMPLER_TYPE=const - JAEGER_SAMPLER_PARAM=1 ports: - - 8006:8006 - - 18006:18006 + - 8007:8007 + - 18007:18007 volumes: - ./config:/cortex/config - query-frontend: + alertmanager: build: context: . dockerfile: dev.dockerfile image: cortex - command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -store.max-query-length=8760h"] + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18010 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8010 -server.grpc-listen-port=9010"] + depends_on: + - consul + - minio + ports: + - 8010:8010 + - 18010:18010 + volumes: + - ./config:/cortex/config + + ruler-1: + build: + context: . + dockerfile: dev.dockerfile + image: cortex + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18021 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=ruler -server.http-listen-port=8021 -server.grpc-listen-port=9021"] depends_on: - consul - minio environment: - JAEGER_AGENT_HOST=jaeger - JAEGER_AGENT_PORT=6831 - - JAEGER_TAGS=app=query-frontend + - JAEGER_TAGS=app=ruler-1 - JAEGER_SAMPLER_TYPE=const - JAEGER_SAMPLER_PARAM=1 ports: - - 8007:8007 - - 18007:18007 + - 8021:8021 + - 18021:18021 volumes: - ./config:/cortex/config - alertmanager: + ruler-2: build: context: . dockerfile: dev.dockerfile image: cortex - command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18010 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8010 -server.grpc-listen-port=9010"] + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18022 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=ruler -server.http-listen-port=8022 -server.grpc-listen-port=9022"] depends_on: - consul - minio + environment: + - JAEGER_AGENT_HOST=jaeger + - JAEGER_AGENT_PORT=6831 + - JAEGER_TAGS=app=ruler-2 + - JAEGER_SAMPLER_TYPE=const + - JAEGER_SAMPLER_PARAM=1 ports: - - 8010:8010 - - 18010:18010 + - 8022:8022 + - 18022:18022 volumes: - ./config:/cortex/config diff --git a/development/tsdb-blocks-storage-s3/goland/ruler-1.run.xml b/development/tsdb-blocks-storage-s3/goland/ruler-1.run.xml new file mode 100644 index 00000000000..d5ed8e78a80 --- /dev/null +++ b/development/tsdb-blocks-storage-s3/goland/ruler-1.run.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/development/tsdb-blocks-storage-s3/goland/ruler-2.run.xml b/development/tsdb-blocks-storage-s3/goland/ruler-2.run.xml new file mode 100644 index 00000000000..44f79aa013d --- /dev/null +++ b/development/tsdb-blocks-storage-s3/goland/ruler-2.run.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a0623f3a761..703c345d9e5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1117,6 +1117,10 @@ storage: # CLI flag: -ruler.enable-sharding [enable_sharding: | default = false] +# The sharding strategy to use. Supported values are: default, shuffle-sharding. +# CLI flag: -ruler.sharding-strategy +[sharding_strategy: | default = "default"] + # Time to spend searching for a pending ruler when shutting down. # CLI flag: -ruler.search-pending-for [search_pending_for: | default = 5m] @@ -2866,6 +2870,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -ruler.evaluation-delay-duration [ruler_evaluation_delay_duration: | default = 0s] +# The default tenant's shard size when the shuffle-sharding strategy is used by +# ruler. When this setting is specified in the per-tenant overrides, a value of +# 0 disables shuffle sharding for the tenant. +# CLI flag: -ruler.tenant-shard-size +[ruler_tenant_shard_size: | default = 0] + # The default tenant's shard size when the shuffle-sharding strategy is used. # Must be set when the store-gateway sharding is enabled with the # shuffle-sharding strategy. When this setting is specified in the per-tenant diff --git a/docs/guides/shuffle-sharding.md b/docs/guides/shuffle-sharding.md index 2179704f09a..7bdfa001b04 100644 --- a/docs/guides/shuffle-sharding.md +++ b/docs/guides/shuffle-sharding.md @@ -51,6 +51,7 @@ Cortex currently supports shuffle sharding in the following services: - [Ingesters](#ingesters-shuffle-sharding) - [Query-frontend](#query-frontend-shuffle-sharding) - [Store-gateway](#store-gateway-shuffle-sharding) +- [Ruler](#ruler-shuffle-sharding) Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration. @@ -92,3 +93,13 @@ When shuffle sharding is **enabled** via `-store-gateway.sharding-strategy=shuff _The shard size can be overridden on a per-tenant basis setting `store_gateway_tenant_shard_size` in the limits overrides configuration._ _Please check out the [store-gateway documentation](../blocks-storage/store-gateway.md) for more information about how it works._ + +### Ruler shuffle sharding + +Cortex ruler can run in three modes: + +1. **No sharding at all.** This is the most basic mode of the ruler. It is activated by using `-ruler.enable-sharding=false` (default) and works correctly only if single ruler is running. In this mode the Ruler loads all rules for all tenants. +2. **Default sharding**, activated by using `-ruler.enable-sharding=true` and `-ruler.sharding-strategy=default` (default). In this mode rulers register themselves into the ring. Each ruler will then select and evaluate only those rules that it "owns". +3. **Shuffle sharding**, activated by using `-ruler.enable-sharding=true` and `-ruler.sharding-strategy=shuffle-sharding`. Similarly to default sharding, rulers use the ring to distribute workload, but rule groups for each tenant can only be evaluated on limited number of rulers (`-ruler.tenant-shard-size`, can also be set per tenant as `ruler_tenant_shard_size` in overrides). + +Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication. diff --git a/integration/chunks_storage_backends_test.go b/integration/chunks_storage_backends_test.go index 91f3e62b1fc..79d3f18e13b 100644 --- a/integration/chunks_storage_backends_test.go +++ b/integration/chunks_storage_backends_test.go @@ -260,7 +260,7 @@ func TestSwiftRuleStorage(t *testing.T) { require.NoError(t, err) // Get rules back. - rls, err := store.ListAllRuleGroups(ctx) + rls, err := store.LoadAllRuleGroups(ctx) require.NoError(t, err) require.Equal(t, 2, len(rls[userID])) @@ -274,7 +274,7 @@ func TestSwiftRuleStorage(t *testing.T) { require.NoError(t, err) //Verify we only have the second rule group - rls, err = store.ListAllRuleGroups(ctx) + rls, err = store.LoadAllRuleGroups(ctx) require.NoError(t, err) require.Equal(t, 1, len(rls[userID])) require.Equal(t, r2, rls[userID][0]) diff --git a/pkg/chunk/storage_client.go b/pkg/chunk/storage_client.go index 3ea2f479594..1ad88bb34bb 100644 --- a/pkg/chunk/storage_client.go +++ b/pkg/chunk/storage_client.go @@ -58,7 +58,7 @@ type ReadBatchIterator interface { Value() []byte } -// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/Etc) +// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...) type ObjectClient interface { PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) @@ -85,4 +85,5 @@ type StorageObject struct { } // StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store. +// It is guaranteed to always end with delimiter passed to List method. type StorageCommonPrefix string diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 5575a76520e..0704f39f8eb 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -165,7 +165,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.ChunkStore.Validate(); err != nil { return errors.Wrap(err, "invalid chunk store config") } - if err := c.Ruler.Validate(); err != nil { + if err := c.Ruler.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid ruler config") } if err := c.BlocksStorage.Validate(); err != nil { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 7c2a98a94e2..b6e5ffb20c5 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -542,6 +542,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { prometheus.DefaultRegisterer, util.Logger, t.RulerStorage, + t.Overrides, ) if err != nil { return diff --git a/pkg/querier/frontend/frontend_querier_queues.go b/pkg/querier/frontend/frontend_querier_queues.go index f15da305ecd..339b5eeec5f 100644 --- a/pkg/querier/frontend/frontend_querier_queues.go +++ b/pkg/querier/frontend/frontend_querier_queues.go @@ -1,10 +1,10 @@ package frontend import ( - "crypto/md5" - "encoding/binary" "math/rand" "sort" + + "github.com/cortexproject/cortex/pkg/util" ) // This struct holds user queues for pending requests. It also keeps track of connected queriers, @@ -89,7 +89,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan *request { if uq == nil { uq = &userQueue{ ch: make(chan *request, q.maxUserQueueSize), - seed: getSeedForUser(userID), + seed: util.ShuffleShardSeed(userID), index: -1, } q.userQueues[userID] = uq @@ -222,10 +222,3 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri return result } - -func getSeedForUser(user string) int64 { - d := md5.New() - _, _ = d.Write([]byte(user)) - buf := d.Sum(nil) - return int64(binary.BigEndian.Uint64(buf)) -} diff --git a/pkg/ring/model.go b/pkg/ring/model.go index b67e8d46bf7..19ab91c9e53 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -162,6 +162,9 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b case BlocksRead: healthy = i.State == ACTIVE + + case Ruler: + healthy = i.State == ACTIVE } return healthy && time.Since(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index f141ff6e5d3..686c012f8e4 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -2,6 +2,7 @@ package ring import ( "context" + "sort" "time" ) @@ -83,3 +84,32 @@ func (r ReplicationSet) Includes(addr string) bool { return false } + +// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps), +// false if they differ in any way (number of instances, instance states, tokens, zones, ...). +func HasReplicationSetChanged(before, after ReplicationSet) bool { + beforeInstances := before.Ingesters + afterInstances := after.Ingesters + + if len(beforeInstances) != len(afterInstances) { + return true + } + + sort.Sort(ByAddr(beforeInstances)) + sort.Sort(ByAddr(afterInstances)) + + for i := 0; i < len(beforeInstances); i++ { + b := beforeInstances[i] + a := afterInstances[i] + + // Exclude the heartbeat timestamp from the comparison. + b.Timestamp = 0 + a.Timestamp = 0 + + if !b.Equal(a) { + return true + } + } + + return false +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 4e1a8b50b8c..354a861861c 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -4,8 +4,6 @@ package ring import ( "context" - "crypto/md5" - "encoding/binary" "errors" "flag" "fmt" @@ -72,6 +70,8 @@ const ( // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead + + Ruler // Used for distributing rule groups between rulers. ) var ( @@ -209,7 +209,7 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client func (r *Ring) loop(ctx context.Context) error { r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { - level.Info(util.Logger).Log("msg", "ring doesn't exist in consul yet") + level.Info(util.Logger).Log("msg", "ring doesn't exist in KV store yet") return true } @@ -471,16 +471,8 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { return cached } - // Use the identifier to compute an hash we'll use to seed the random. - hasher := md5.New() - hasher.Write([]byte(identifier)) // nolint:errcheck - checksum := hasher.Sum(nil) - - // Generate the seed based on the first 64 bits of the checksum. - seed := int64(binary.BigEndian.Uint64(checksum)) - // Initialise the random generator used to select instances in the ring. - random := rand.New(rand.NewSource(seed)) + random := rand.New(rand.NewSource(util.ShuffleShardSeed(identifier))) var result *Ring diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index dba9e15d390..4e8548a381e 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -377,7 +377,7 @@ func (r *Ruler) ListRules(w http.ResponseWriter, req *http.Request) { } level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) - rgs, err := r.store.ListRuleGroups(req.Context(), userID, namespace) + rgs, err := r.store.LoadRuleGroupsForUserAndNamespace(req.Context(), userID, namespace) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 494e81b5de0..0d60f499b6d 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -72,10 +72,10 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender { } } -// RulesLimits is the one function we need from limits.Overrides, and -// is here to limit coupling. +// RulesLimits defines limits used by Ruler. type RulesLimits interface { - EvaluationDelay(usedID string) time.Duration + EvaluationDelay(userID string) time.Duration + RulerTenantShardSize(userID string) int } // engineQueryFunc returns a new query function using the rules.EngineQueryFunc function @@ -90,28 +90,26 @@ func engineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides Rules } } -type ManagerFactory = func( - ctx context.Context, - userID string, - notifier *notifier.Manager, - logger log.Logger, - reg prometheus.Registerer, -) *rules.Manager - -func DefaultTenantManagerFactory( - cfg Config, - p Pusher, - q storage.Queryable, - engine *promql.Engine, - overrides RulesLimits, -) ManagerFactory { - return func( - ctx context.Context, - userID string, - notifier *notifier.Manager, - logger log.Logger, - reg prometheus.Registerer, - ) *rules.Manager { +// This interface mimicks rules.Manager API. Interface is used to simplify tests. +type RulesManager interface { + // Starts rules manager. Blocks until Stop is called. + Run() + + // Stops rules manager. (Unblocks Run.) + Stop() + + // Updates rules manager state. + Update(interval time.Duration, files []string, externalLabels labels.Labels) error + + // Returns current rules groups. + RuleGroups() []*rules.Group +} + +// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager. +type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager + +func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine *promql.Engine, overrides RulesLimits) ManagerFactory { + return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager { return rules.NewManager(&rules.ManagerOptions{ Appendable: &PusherAppendable{pusher: p, userID: userID}, Queryable: q, diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 7034e037833..0b08103ed66 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -33,7 +33,7 @@ type DefaultMultiTenantManager struct { // Structs for holding per-user Prometheus rules Managers // and a corresponding metrics struct userManagerMtx sync.Mutex - userManagers map[string]*promRules.Manager + userManagers map[string]RulesManager userManagerMetrics *ManagerMetrics // Per-user notifiers with separate queues. @@ -65,7 +65,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg managerFactory: managerFactory, notifiers: map[string]*rulerNotifier{}, mapper: newMapper(cfg.RulePath, logger), - userManagers: map[string]*promRules.Manager{}, + userManagers: map[string]RulesManager{}, userManagerMetrics: userManagerMetrics, managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", @@ -110,6 +110,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.lastReloadSuccessful.DeleteLabelValues(userID) r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID) r.configUpdatesTotal.DeleteLabelValues(userID) + r.userManagerMetrics.DeleteUserRegistry(userID) level.Info(r.logger).Log("msg", "deleting rule manager", "user", userID) } } @@ -129,11 +130,12 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user return } - if update { - level.Debug(r.logger).Log("msg", "updating rules", "user", "user") + manager, exists := r.userManagers[user] + if !exists || update { + level.Debug(r.logger).Log("msg", "updating rules", "user", user) r.configUpdatesTotal.WithLabelValues(user).Inc() - manager, exists := r.userManagers[user] if !exists { + level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user) manager, err = r.newManager(ctx, user) if err != nil { r.lastReloadSuccessful.WithLabelValues(user).Set(0) @@ -159,7 +161,7 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user // newManager creates a prometheus rule manager wrapped with a user id // configured storage, appendable, notifier, and instrumentation -func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (*promRules.Manager, error) { +func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID string) (RulesManager, error) { notifier, err := r.getOrCreateNotifier(userID) if err != nil { return nil, err @@ -239,7 +241,7 @@ func (r *DefaultMultiTenantManager) Stop() { for user, manager := range r.userManagers { level.Debug(r.logger).Log("msg", "shutting down user manager", "user", user) wg.Add(1) - go func(manager *promRules.Manager, user string) { + go func(manager RulesManager, user string) { manager.Stop() wg.Done() level.Debug(r.logger).Log("msg", "user manager shut down", "user", user) diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index 27aa32f6540..b888a065716 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -99,8 +99,17 @@ func NewManagerMetrics() *ManagerMetrics { // AddUserRegistry adds a Prometheus registry to the struct func (m *ManagerMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { m.regsMu.Lock() + defer m.regsMu.Unlock() + m.regs[user] = reg - m.regsMu.Unlock() +} + +// DeleteUserRegistry removes user-specific Prometheus registry. +func (m *ManagerMetrics) DeleteUserRegistry(user string) { + m.regsMu.Lock() + defer m.regsMu.Unlock() + + delete(m.regs, user) } // Registries returns a map of prometheus registries managed by the struct @@ -134,6 +143,10 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) { func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) { data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.Registries()) + // WARNING: It is important that all metrics generated in this method are "Per User". + // Thanks to that we can actually *remove* metrics for given user (see DeleteUserRegistry). + // If same user is later re-added, all metrics will start from 0, which is fine. + data.SendSumOfSummariesPerUser(out, m.EvalDuration, "prometheus_rule_evaluation_duration_seconds") data.SendSumOfSummariesPerUser(out, m.IterationDuration, "cortex_prometheus_rule_group_duration_seconds") diff --git a/pkg/ruler/manager_metrics_test.go b/pkg/ruler/manager_metrics_test.go index 01217cb821d..2686e242351 100644 --- a/pkg/ruler/manager_metrics_test.go +++ b/pkg/ruler/manager_metrics_test.go @@ -7,6 +7,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -19,6 +21,9 @@ func TestManagerMetrics(t *testing.T) { managerMetrics.AddUserRegistry("user2", populateManager(10)) managerMetrics.AddUserRegistry("user3", populateManager(100)) + managerMetrics.AddUserRegistry("user4", populateManager(1000)) + managerMetrics.DeleteUserRegistry("user4") + //noinspection ALL err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` # HELP cortex_prometheus_rule_evaluation_duration_seconds The duration for a rule to execute. @@ -207,3 +212,46 @@ func newGroupMetrics(r prometheus.Registerer) *groupMetrics { return m } + +func TestMetricsArePerUser(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + + managerMetrics := NewManagerMetrics() + mainReg.MustRegister(managerMetrics) + managerMetrics.AddUserRegistry("user1", populateManager(1)) + managerMetrics.AddUserRegistry("user2", populateManager(10)) + managerMetrics.AddUserRegistry("user3", populateManager(100)) + + ch := make(chan prometheus.Metric) + + defer func() { + // drain the channel, so that collecting gouroutine can stop. + // This is useful if test fails. + for range ch { + } + }() + + go func() { + managerMetrics.Collect(ch) + close(ch) + }() + + for m := range ch { + desc := m.Desc() + + dtoM := &dto.Metric{} + err := m.Write(dtoM) + + require.NoError(t, err) + + foundUserLabel := false + for _, l := range dtoM.Label { + if l.GetName() == "user" { + foundUserLabel = true + break + } + } + + assert.True(t, foundUserLabel, "user label not found for metric %s", desc.String()) + } +} diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go new file mode 100644 index 00000000000..2256513a112 --- /dev/null +++ b/pkg/ruler/manager_test.go @@ -0,0 +1,112 @@ +package ruler + +import ( + "context" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/labels" + promRules "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestSyncRuleGroups(t *testing.T) { + dir, err := ioutil.TempDir("", "rules") + require.NoError(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(dir) + }) + + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger()) + require.NoError(t, err) + + const user = "testUser" + + userRules := map[string]rules.RuleGroupList{ + user: { + &rules.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + m.SyncRuleGroups(context.Background(), userRules) + + mgr := getManager(m, user) + require.NotNil(t, mgr) + + test.Poll(t, 1*time.Second, true, func() interface{} { + return mgr.(*mockRulesManager).running.Load() + }) + + // Passing empty map / nil stops all managers. + m.SyncRuleGroups(context.Background(), nil) + require.Nil(t, getManager(m, user)) + + // Make sure old manager was stopped. + test.Poll(t, 1*time.Second, false, func() interface{} { + return mgr.(*mockRulesManager).running.Load() + }) + + // Resync same rules as before. Previously this didn't restart the manager. + m.SyncRuleGroups(context.Background(), userRules) + + newMgr := getManager(m, user) + require.NotNil(t, newMgr) + require.True(t, mgr != newMgr) + + test.Poll(t, 1*time.Second, true, func() interface{} { + return newMgr.(*mockRulesManager).running.Load() + }) + + m.Stop() + + test.Poll(t, 1*time.Second, false, func() interface{} { + return newMgr.(*mockRulesManager).running.Load() + }) +} + +func getManager(m *DefaultMultiTenantManager, user string) RulesManager { + m.userManagerMtx.Lock() + defer m.userManagerMtx.Unlock() + + return m.userManagers[user] +} + +func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { + return &mockRulesManager{done: make(chan struct{})} +} + +type mockRulesManager struct { + running atomic.Bool + done chan struct{} +} + +func (m *mockRulesManager) Run() { + m.running.Store(true) + <-m.done +} + +func (m *mockRulesManager) Stop() { + m.running.Store(false) + close(m.done) +} + +func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels) error { + return nil +} + +func (m *mockRulesManager) RuleGroups() []*promRules.Group { + return nil +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 76aad94cf5a..3e2bbf86212 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -9,6 +9,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "time" "github.com/go-kit/kit/log" @@ -21,6 +22,7 @@ import ( promRules "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/util/strutil" "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -28,17 +30,31 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rules" store "github.com/cortexproject/cortex/pkg/ruler/rules" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/tls" + "github.com/cortexproject/cortex/pkg/util/validation" ) var ( - ringCheckErrors = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "ruler_ring_check_errors_total", - Help: "Number of errors that have occurred when checking the ring for ownership", - }) + supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle} + + // Validation errors. + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") +) + +const ( + // Supported sharding strategies. + ShardingStrategyDefault = "default" + ShardingStrategyShuffle = "shuffle-sharding" + + loadRulesConcurrency = 10 + + rulerSyncReasonInitial = "initial" + rulerSyncReasonPeriodic = "periodic" + rulerSyncReasonRingChange = "ring-change" ) // Config is the configuration for the recording rules server. @@ -81,15 +97,26 @@ type Config struct { // Enable sharding rule groups. EnableSharding bool `yaml:"enable_sharding"` + ShardingStrategy string `yaml:"sharding_strategy"` SearchPendingFor time.Duration `yaml:"search_pending_for"` Ring RingConfig `yaml:"ring"` FlushCheckPeriod time.Duration `yaml:"flush_period"` EnableAPI bool `yaml:"enable_api"` + + RingCheckPeriod time.Duration `yaml:"-"` } // Validate config and returns error on failure -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(limits validation.Limits) error { + if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) { + return errInvalidShardingStrategy + } + + if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.RulerTenantShardSize <= 0 { + return errInvalidTenantShardSize + } + if err := cfg.StoreConfig.Validate(); err != nil { return errors.Wrap(err, "invalid storage config") } @@ -122,17 +149,21 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SearchPendingFor, "ruler.search-pending-for", 5*time.Minute, "Time to spend searching for a pending ruler when shutting down.") f.BoolVar(&cfg.EnableSharding, "ruler.enable-sharding", false, "Distribute rule evaluation using ring backend") + f.StringVar(&cfg.ShardingStrategy, "ruler.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.DurationVar(&cfg.FlushCheckPeriod, "ruler.flush-period", 1*time.Minute, "Period with which to attempt to flush rule groups.") f.StringVar(&cfg.RulePath, "ruler.rule-path", "/rules", "file path to store temporary rule files for the prometheus rule managers") f.BoolVar(&cfg.EnableAPI, "experimental.ruler.enable-api", false, "Enable the ruler api") f.DurationVar(&cfg.OutageTolerance, "ruler.for-outage-tolerance", time.Hour, `Max time to tolerate outage for restoring "for" state of alert.`) f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`) f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`) + + cfg.RingCheckPeriod = 5 * time.Second } // MultiTenantManager is the interface of interaction with a Manager that is tenant aware. type MultiTenantManager interface { // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. + // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. SyncRuleGroups(ctx context.Context, ruleGroups map[string]store.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). GetRules(userID string) []*promRules.Group @@ -177,19 +208,34 @@ type Ruler struct { subservices *services.Manager store rules.RuleStore manager MultiTenantManager + limits RulesLimits + + ringCheckErrors prometheus.Counter + rulerSync *prometheus.CounterVec registry prometheus.Registerer logger log.Logger } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rules.RuleStore) (*Ruler, error) { +func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rules.RuleStore, limits RulesLimits) (*Ruler, error) { ruler := &Ruler{ cfg: cfg, store: ruleStore, manager: manager, registry: reg, logger: logger, + limits: limits, + + ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ruler_ring_check_errors_total", + Help: "Number of errors that have occurred when checking the ring for ownership", + }), + + rulerSync: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_sync_rules_total", + Help: "Total number of times the ruler sync operation triggered.", + }, []string{"reason"}), } if cfg.EnableSharding { @@ -233,7 +279,7 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { return errors.Wrap(err, "failed to initialize ruler's lifecycler") } - r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ring.RulerRingKey, ringStore, &ring.DefaultReplicationStrategy{}) + r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ring.RulerRingKey, ringStore, rulerReplicationStrategy{}) if err != nil { return errors.Wrap(err, "failed to initialize ruler's ring") } @@ -299,22 +345,30 @@ func SendAlerts(n *notifier.Manager, externalURL string) promRules.NotifyFunc { } } -func (r *Ruler) ownsRule(hash uint32) (bool, error) { - rlrs, err := r.ring.Get(hash, ring.Read, []ring.IngesterDesc{}) - if err != nil { - level.Warn(r.logger).Log("msg", "error reading ring to verify rule group ownership", "err", err) - ringCheckErrors.Inc() - return false, err - } +var sep = []byte("/") + +func tokenForGroup(g *store.RuleGroupDesc) uint32 { + ringHasher := fnv.New32a() - localAddr := r.lifecycler.GetInstanceAddr() + // Hasher never returns err. + _, _ = ringHasher.Write([]byte(g.User)) + _, _ = ringHasher.Write(sep) + _, _ = ringHasher.Write([]byte(g.Namespace)) + _, _ = ringHasher.Write(sep) + _, _ = ringHasher.Write([]byte(g.Name)) + + return ringHasher.Sum32() +} - if rlrs.Ingesters[0].Addr == localAddr { - level.Debug(r.logger).Log("msg", "rule group owned", "owner_addr", rlrs.Ingesters[0].Addr, "addr", localAddr) - return true, nil +func instanceOwnsRuleGroup(r ring.ReadRing, g *rules.RuleGroupDesc, instanceAddr string) (bool, error) { + hash := tokenForGroup(g) + + rlrs, err := r.Get(hash, ring.Ruler, []ring.IngesterDesc{}) + if err != nil { + return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } - level.Debug(r.logger).Log("msg", "rule group not owned, address does not match", "owner_addr", rlrs.Ingesters[0].Addr, "addr", localAddr) - return false, nil + + return rlrs.Ingesters[0].Addr == instanceAddr, nil } func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -347,59 +401,177 @@ func (r *Ruler) run(ctx context.Context) error { tick := time.NewTicker(r.cfg.PollInterval) defer tick.Stop() - r.loadRules(ctx) + var ringTickerChan <-chan time.Time + var ringLastState ring.ReplicationSet + + if r.cfg.EnableSharding { + ringLastState, _ = r.ring.GetAll(ring.Ruler) + ringTicker := time.NewTicker(util.DurationWithJitter(r.cfg.RingCheckPeriod, 0.2)) + defer ringTicker.Stop() + ringTickerChan = ringTicker.C + } + + r.syncRules(ctx, rulerSyncReasonInitial) for { select { case <-ctx.Done(): return nil case <-tick.C: - r.loadRules(ctx) + r.syncRules(ctx, rulerSyncReasonPeriodic) + case <-ringTickerChan: + // We ignore the error because in case of error it will return an empty + // replication set which we use to compare with the previous state. + currRingState, _ := r.ring.GetAll(ring.Ruler) + + if ring.HasReplicationSetChanged(ringLastState, currRingState) { + ringLastState = currRingState + r.syncRules(ctx, rulerSyncReasonRingChange) + } } } } -func (r *Ruler) loadRules(ctx context.Context) { - ringHasher := fnv.New32a() +func (r *Ruler) syncRules(ctx context.Context, reason string) { + level.Debug(r.logger).Log("msg", "syncing rules", "reason", reason) + r.rulerSync.WithLabelValues(reason).Inc() + + configs, err := r.loadRules(ctx) - configs, err := r.store.ListAllRuleGroups(ctx) if err != nil { - level.Error(r.logger).Log("msg", "unable to poll for rules", "err", err) + level.Error(r.logger).Log("msg", "unable to load rules", "err", err) return } + r.manager.SyncRuleGroups(ctx, configs) +} + +func (r *Ruler) loadRules(ctx context.Context) (map[string]rules.RuleGroupList, error) { + switch { + case !r.cfg.EnableSharding: + return r.loadRulesNoSharding(ctx) + + case r.cfg.ShardingStrategy == ShardingStrategyDefault: + return r.loadRulesShardingDefault(ctx) + + case r.cfg.ShardingStrategy == ShardingStrategyShuffle: + return r.loadRulesShuffleSharding(ctx) + + default: + return nil, errors.New("invalid sharding configuration") + } +} + +func (r *Ruler) loadRulesNoSharding(ctx context.Context) (map[string]rules.RuleGroupList, error) { + return r.store.LoadAllRuleGroups(ctx) +} + +func (r *Ruler) loadRulesShardingDefault(ctx context.Context) (map[string]rules.RuleGroupList, error) { + configs, err := r.store.LoadAllRuleGroups(ctx) + if err != nil { + return nil, err + } + // Iterate through each users configuration and determine if the on-disk // configurations need to be updated filteredConfigs := make(map[string]rules.RuleGroupList) - for userID, cfg := range configs { - filteredConfigs[userID] = store.RuleGroupList{} - - // If sharding is enabled, prune the rule group to only contain rules - // this ruler is responsible for. - if r.cfg.EnableSharding { - for _, g := range cfg { - id := g.User + "/" + g.Namespace + "/" + g.Name - ringHasher.Reset() - _, err = ringHasher.Write([]byte(id)) - if err != nil { - level.Error(r.logger).Log("msg", "failed to create group for user", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) - continue - } - hash := ringHasher.Sum32() - owned, err := r.ownsRule(hash) + for userID, groups := range configs { + filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(filtered) > 0 { + filteredConfigs[userID] = filtered + } + } + return filteredConfigs, nil +} + +func (r *Ruler) loadRulesShuffleSharding(ctx context.Context) (map[string]rules.RuleGroupList, error) { + users, err := r.store.ListAllUsers(ctx) + if err != nil { + return nil, errors.Wrap(err, "unable to list users of ruler") + } + + // Only users in userRings will be used in the to load the rules. + userRings := map[string]ring.ReadRing{} + for _, u := range users { + if shardSize := r.limits.RulerTenantShardSize(u); shardSize > 0 { + subRing := r.ring.ShuffleShard(u, shardSize) + + // Include the user only if it belongs to this ruler shard. + if subRing.HasInstance(r.lifecycler.GetInstanceID()) { + userRings[u] = subRing + } + } else { + // A shard size of 0 means shuffle sharding is disabled for this specific user. + // In that case we use the full ring so that rule groups will be sharded across all rulers. + userRings[u] = r.ring + } + } + + if len(userRings) == 0 { + return nil, nil + } + + userCh := make(chan string, len(userRings)) + for u := range userRings { + userCh <- u + } + close(userCh) + + mu := sync.Mutex{} + result := map[string]rules.RuleGroupList{} + + concurrency := loadRulesConcurrency + if len(userRings) < concurrency { + concurrency = len(userRings) + } + + g, gctx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + g.Go(func() error { + for userID := range userCh { + groups, err := r.store.LoadRuleGroupsForUserAndNamespace(gctx, userID, "") if err != nil { - level.Error(r.logger).Log("msg", "unable to verify rule group ownership ownership, will retry on the next poll", "err", err) - return + return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - if owned { - filteredConfigs[userID] = append(filteredConfigs[userID], g) + + filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(filtered) == 0 { + continue } + + mu.Lock() + result[userID] = filtered + mu.Unlock() } + return nil + }) + } + + err = g.Wait() + return result, err +} + +// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, +// but only ring passed as parameter. +func filterRuleGroups(userID string, ruleGroups []*store.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*store.RuleGroupDesc { + // Prune the rule group to only contain rules that this ruler is responsible for, based on ring. + var result []*rules.RuleGroupDesc + for _, g := range ruleGroups { + owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr) + if err != nil { + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to create group for user", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } + + if owned { + level.Debug(log).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name) + result = append(result, g) } else { - filteredConfigs[userID] = cfg + level.Debug(log).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) } } - r.manager.SyncRuleGroups(ctx, filteredConfigs) + return result } // GetRules retrieves the running rules from this ruler and all running rulers in the ring if @@ -505,7 +677,7 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { } func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error) { - rulers, err := r.ring.GetAll(ring.Read) + rulers, err := r.ring.GetAll(ring.Ruler) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler_replication_strategy.go b/pkg/ruler/ruler_replication_strategy.go new file mode 100644 index 00000000000..0d16572fef1 --- /dev/null +++ b/pkg/ruler/ruler_replication_strategy.go @@ -0,0 +1,37 @@ +package ruler + +import ( + "time" + + "github.com/pkg/errors" + + "github.com/cortexproject/cortex/pkg/ring" +) + +type rulerReplicationStrategy struct { +} + +func (r rulerReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []ring.IngesterDesc, maxFailures int, err error) { + // Filter out unhealthy instances. + for i := 0; i < len(instances); { + if instances[i].IsHealthy(op, heartbeatTimeout) { + i++ + } else { + instances = append(instances[:i], instances[i+1:]...) + } + } + + if len(instances) == 0 { + return nil, 0, errors.New("no healthy ruler instance found for the replication set") + } + + return instances, len(instances) - 1, nil +} + +func (r rulerReplicationStrategy) ShouldExtendReplicaSet(instance ring.IngesterDesc, op ring.Operation) bool { + // Only ACTIVE rulers get any rule groups. If instance is not ACTIVE, we need to find another ruler. + if op == ring.Ruler && instance.GetState() != ring.ACTIVE { + return true + } + return false +} diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 979a72f8ed1..9948e2a8c7a 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -91,6 +91,8 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.KVStore = cfg.KVStore rc.HeartbeatTimeout = cfg.HeartbeatTimeout + + // Each rule group is loaded to *exactly* one ruler. rc.ReplicationFactor = 1 return rc diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 4f0f0497277..bf4f94f6e1b 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -3,9 +3,12 @@ package ruler import ( "context" "io/ioutil" + "math/rand" "net/http" "net/http/httptest" "os" + "path/filepath" + "sort" "strings" "sync" "testing" @@ -27,6 +30,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/util" @@ -58,14 +62,21 @@ func defaultRulerConfig(store rules.RuleStore) (Config, func()) { return cfg, cleanup } -type ruleLimits time.Duration +type ruleLimits struct { + evalDelay time.Duration + tenantShard int +} func (r ruleLimits) EvaluationDelay(_ string) time.Duration { - return time.Duration(r) + return r.evalDelay +} + +func (r ruleLimits) RulerTenantShardSize(_ string) int { + return r.tenantShard } func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, func()) { - dir, err := ioutil.TempDir("", t.Name()) + dir, err := ioutil.TempDir("", filepath.Base(t.Name())) testutil.Ok(t, err) cleanup := func() { os.RemoveAll(dir) @@ -90,7 +101,7 @@ func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, l := log.NewLogfmtLogger(os.Stdout) l = level.NewFilter(l, level.AllowInfo()) - return engine, noopQueryable, pusher, l, ruleLimits(0), cleanup + return engine, noopQueryable, pusher, l, ruleLimits{evalDelay: 0}, cleanup } func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) { @@ -117,6 +128,7 @@ func newRuler(t *testing.T, cfg Config) (*Ruler, func()) { reg, logger, storage, + overrides, ) require.NoError(t, err) @@ -128,7 +140,7 @@ func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) { require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) // Ensure all rules are loaded before usage - ruler.loadRules(context.Background()) + ruler.syncRules(context.Background(), rulerSyncReasonInitial) return ruler, cleanup } @@ -216,3 +228,374 @@ func compareRuleGroupDescToStateDesc(t *testing.T, expected *rules.RuleGroupDesc require.Equal(t, expected.Rules[i].Alert, got.ActiveRules[i].Rule.Alert) } } + +func TestSharding(t *testing.T) { + const ( + user1 = "user1" + user2 = "user2" + user3 = "user3" + ) + + user1Group1 := &rules.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "first"} + user1Group2 := &rules.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "second"} + user2Group1 := &rules.RuleGroupDesc{User: user2, Namespace: "namespace", Name: "first"} + user3Group1 := &rules.RuleGroupDesc{User: user3, Namespace: "namespace", Name: "first"} + + // Must be distinct for test to work. + user1Group1Token := tokenForGroup(user1Group1) + user1Group2Token := tokenForGroup(user1Group2) + user2Group1Token := tokenForGroup(user2Group1) + user3Group1Token := tokenForGroup(user3Group1) + + noRules := map[string]rules.RuleGroupList{} + allRules := map[string]rules.RuleGroupList{ + user1: {user1Group1, user1Group2}, + user2: {user2Group1}, + user3: {user3Group1}, + } + + // ruler ID -> (user ID -> list of groups). + type expectedRulesMap map[string]map[string]rules.RuleGroupList + + type testCase struct { + sharding bool + shardingStrategy string + shuffleShardSize int + setupRing func(*ring.Desc) + + expectedRules expectedRulesMap + } + + const ( + ruler1 = "ruler-1" + ruler1Host = "1.1.1.1" + ruler1Port = 9999 + ruler1Addr = "1.1.1.1:9999" + + ruler2 = "ruler-2" + ruler2Host = "2.2.2.2" + ruler2Port = 9999 + ruler2Addr = "2.2.2.2:9999" + + ruler3 = "ruler-3" + ruler3Host = "3.3.3.3" + ruler3Port = 9999 + ruler3Addr = "3.3.3.3:9999" + ) + + testCases := map[string]testCase{ + "no sharding": { + sharding: false, + expectedRules: expectedRulesMap{ruler1: allRules}, + }, + + "default sharding, single ruler": { + sharding: true, + shardingStrategy: ShardingStrategyDefault, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + }, + expectedRules: expectedRulesMap{ruler1: allRules}, + }, + + "default sharding, multiple ACTIVE rulers": { + sharding: true, + shardingStrategy: ShardingStrategyDefault, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rules.RuleGroupList{ + user1: {user1Group1}, + user2: {user2Group1}, + }, + + ruler2: map[string]rules.RuleGroupList{ + user1: {user1Group2}, + user3: {user3Group1}, + }, + }, + }, + + "default sharding, unhealthy ACTIVE ruler": { + sharding: true, + shardingStrategy: ShardingStrategyDefault, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.Ingesters[ruler2] = ring.IngesterDesc{ + Addr: ruler2Addr, + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), + } + }, + + expectedRules: expectedRulesMap{ + // This ruler doesn't get rules from unhealthy ruler (RF=1). + ruler1: map[string]rules.RuleGroupList{ + user1: {user1Group1}, + user2: {user2Group1}, + }, + ruler2: noRules, + }, + }, + + "default sharding, LEAVING ruler": { + sharding: true, + shardingStrategy: ShardingStrategyDefault, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + // LEAVING ruler doesn't get any rules. + ruler1: noRules, + ruler2: allRules, + }, + }, + + "default sharding, JOINING ruler": { + sharding: true, + shardingStrategy: ShardingStrategyDefault, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + // JOINING ruler has no rules yet. + ruler1: noRules, + ruler2: allRules, + }, + }, + + "shuffle sharding, single ruler": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: allRules, + }, + }, + + "shuffle sharding, multiple rulers, shard size 1": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + shuffleShardSize: 1, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: allRules, + ruler2: noRules, + }, + }, + + // Same test as previous one, but with shard size=2. Second ruler gets all the rules. + "shuffle sharding, two rulers, shard size 2": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + shuffleShardSize: 2, + + setupRing: func(desc *ring.Desc) { + // Exact same tokens setup as previous test. + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: noRules, + ruler2: allRules, + }, + }, + + "shuffle sharding, two rulers, shard size 1, distributed users": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + shuffleShardSize: 1, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rules.RuleGroupList{ + user1: {user1Group1, user1Group2}, + }, + ruler2: map[string]rules.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }, + }, + }, + "shuffle sharding, three rulers, shard size 2": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + shuffleShardSize: 2, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rules.RuleGroupList{ + user1: {user1Group1}, + }, + ruler2: map[string]rules.RuleGroupList{ + user1: {user1Group2}, + }, + ruler3: map[string]rules.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }, + }, + }, + "shuffle sharding, three rulers, shard size 2, ruler2 has no users": { + sharding: true, + shardingStrategy: ShardingStrategyShuffle, + shuffleShardSize: 2, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rules.RuleGroupList{ + user1: {user1Group1, user1Group2}, + }, + ruler2: noRules, // Ruler2 owns token for user2group1, but user-2 will only be handled by ruler-1 and 3. + ruler3: map[string]rules.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + kvStore := consul.NewInMemoryClient(ring.GetCodec()) + + setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { + cfg := Config{ + StoreConfig: RuleStoreConfig{mock: newMockRuleStore(allRules)}, + EnableSharding: tc.sharding, + ShardingStrategy: tc.shardingStrategy, + Ring: RingConfig{ + InstanceID: id, + InstanceAddr: host, + InstancePort: port, + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 1 * time.Minute, + }, + FlushCheckPeriod: 0, + } + + r, cleanup := newRuler(t, cfg) + r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} + t.Cleanup(cleanup) + + if forceRing != nil { + r.ring = forceRing + } + return r + } + + r1 := setupRuler(ruler1, ruler1Host, ruler1Port, nil) + + rulerRing := r1.ring + + // We start ruler's ring, but nothing else (not even lifecycler). + if rulerRing != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rulerRing)) + t.Cleanup(rulerRing.StopAsync) + } + + var r2, r3 *Ruler + if rulerRing != nil { + // Reuse ring from r1. + r2 = setupRuler(ruler2, ruler2Host, ruler2Port, rulerRing) + r3 = setupRuler(ruler3, ruler3Host, ruler3Port, rulerRing) + } + + if tc.setupRing != nil { + err := kvStore.CAS(context.Background(), ring.RulerRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + + tc.setupRing(d) + + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + + // Always add ruler1 to expected rulers, even if there is no ring (no sharding). + loadedRules1, err := r1.loadRules(context.Background()) + require.NoError(t, err) + + expected := expectedRulesMap{ + ruler1: loadedRules1, + } + + addToExpected := func(id string, r *Ruler) { + // Only expect rules from other rulers when using ring, and they are present in the ring. + if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { + loaded, err := r.loadRules(context.Background()) + require.NoError(t, err) + // Normalize nil map to empty one. + if loaded == nil { + loaded = map[string]rules.RuleGroupList{} + } + expected[id] = loaded + } + } + + addToExpected(ruler2, r2) + addToExpected(ruler3, r3) + + require.Equal(t, tc.expectedRules, expected) + }) + } +} + +// User shuffle shard token. +func userToken(user string, skip int) uint32 { + r := rand.New(rand.NewSource(util.ShuffleShardSeed(user))) + + for ; skip > 0; skip-- { + _ = r.Uint32() + } + return r.Uint32() +} + +func sortTokens(tokens []uint32) []uint32 { + sort.Slice(tokens, func(i, j int) bool { + return tokens[i] < tokens[j] + }) + return tokens +} diff --git a/pkg/ruler/rules/local/local.go b/pkg/ruler/rules/local/local.go index bc7f4cc008d..867b2e3df9f 100644 --- a/pkg/ruler/rules/local/local.go +++ b/pkg/ruler/rules/local/local.go @@ -40,16 +40,14 @@ func NewLocalRulesClient(cfg Config, loader promRules.GroupLoader) (*Client, err }, nil } -// ListAllRuleGroups implements RuleStore -func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { - lists := make(map[string]rules.RuleGroupList) - +func (l *Client) ListAllUsers(ctx context.Context) ([]string, error) { root := l.cfg.Directory infos, err := ioutil.ReadDir(root) if err != nil { return nil, errors.Wrapf(err, "unable to read dir %s", root) } + var result []string for _, info := range infos { // After resolving link, info.Name() may be different than user, so keep original name. user := info.Name() @@ -62,11 +60,24 @@ func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGr } } - if !info.IsDir() { - continue + if info.IsDir() { + result = append(result, user) } + } - list, err := l.listAllRulesGroupsForUser(ctx, user) + return result, nil +} + +// LoadAllRuleGroups implements rules.RuleStore +func (l *Client) LoadAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { + users, err := l.ListAllUsers(ctx) + if err != nil { + return nil, err + } + + lists := make(map[string]rules.RuleGroupList) + for _, user := range users { + list, err := l.loadAllRulesGroupsForUser(ctx, user) if err != nil { return nil, errors.Wrapf(err, "failed to list rule groups for user %s", user) } @@ -77,13 +88,13 @@ func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGr return lists, nil } -// ListRuleGroups implements RuleStore -func (l *Client) ListRuleGroups(ctx context.Context, userID string, namespace string) (rules.RuleGroupList, error) { +// LoadRuleGroupsForUserAndNamespace implements rules.RuleStore +func (l *Client) LoadRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rules.RuleGroupList, error) { if namespace != "" { return l.listAllRulesGroupsForUserAndNamespace(ctx, userID, namespace) } - return l.listAllRulesGroupsForUser(ctx, userID) + return l.loadAllRulesGroupsForUser(ctx, userID) } // GetRuleGroup implements RuleStore @@ -106,7 +117,7 @@ func (l *Client) DeleteNamespace(ctx context.Context, userID, namespace string) return errors.New("DeleteNamespace unsupported in rule local store") } -func (l *Client) listAllRulesGroupsForUser(ctx context.Context, userID string) (rules.RuleGroupList, error) { +func (l *Client) loadAllRulesGroupsForUser(ctx context.Context, userID string) (rules.RuleGroupList, error) { var allLists rules.RuleGroupList root := filepath.Join(l.cfg.Directory, userID) diff --git a/pkg/ruler/rules/local/local_test.go b/pkg/ruler/rules/local/local_test.go index 240fe448baf..8e330dab953 100644 --- a/pkg/ruler/rules/local/local_test.go +++ b/pkg/ruler/rules/local/local_test.go @@ -17,7 +17,7 @@ import ( "github.com/cortexproject/cortex/pkg/ruler/rules" ) -func TestClient_ListAllRuleGroups(t *testing.T) { +func TestClient_LoadAllRuleGroups(t *testing.T) { user1 := "user" user2 := "second-user" @@ -73,7 +73,7 @@ func TestClient_ListAllRuleGroups(t *testing.T) { require.NoError(t, err) ctx := context.Background() - userMap, err := client.ListAllRuleGroups(ctx) + userMap, err := client.LoadAllRuleGroups(ctx) require.NoError(t, err) for _, u := range []string{user1, user2} { diff --git a/pkg/ruler/rules/objectclient/rule_store.go b/pkg/ruler/rules/objectclient/rule_store.go index 834f82d7806..74ad8792ac7 100644 --- a/pkg/ruler/rules/objectclient/rule_store.go +++ b/pkg/ruler/rules/objectclient/rule_store.go @@ -5,11 +5,11 @@ import ( "context" "encoding/base64" "io/ioutil" - strings "strings" + "strings" "sync" "github.com/go-kit/kit/log/level" - proto "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/proto" "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/chunk" @@ -27,8 +27,8 @@ import ( // across all backends const ( - rulePrefix = "rules/" - + delim = "/" + rulePrefix = "rules" + delim loadRuleGroupsConcurrency = 4 ) @@ -54,7 +54,7 @@ func (o *RuleStore) getRuleGroup(ctx context.Context, objectKey string) (*rules. if err != nil { return nil, err } - defer reader.Close() + defer func() { _ = reader.Close() }() buf, err := ioutil.ReadAll(reader) if err != nil { @@ -71,8 +71,30 @@ func (o *RuleStore) getRuleGroup(ctx context.Context, objectKey string) (*rules. return rg, nil } -// ListAllRuleGroups returns all the active rule groups -func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { +func (o *RuleStore) ListAllUsers(ctx context.Context) ([]string, error) { + _, prefixes, err := o.client.List(ctx, rulePrefix, delim) + if err != nil { + return nil, err + } + + var result []string + for _, p := range prefixes { + s := string(p) + + s = strings.TrimPrefix(s, rulePrefix) + s = strings.TrimSuffix(s, delim) + + if s != "" { + result = append(result, s) + } + } + + return result, nil +} + +// LoadAllRuleGroups implements rules.RuleStore. +func (o *RuleStore) LoadAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { + // No delimiter to get *all* rule groups for all users and namespaces. ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey("", "", ""), "") if err != nil { return nil, err @@ -85,8 +107,7 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.Rul return o.loadRuleGroupsConcurrently(ctx, ruleGroupObjects) } -// ListRuleGroups returns all the active rule groups for a user -func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { +func (o *RuleStore) LoadRuleGroupsForUserAndNamespace(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, ""), "") if err != nil { return nil, err @@ -203,22 +224,22 @@ func (o *RuleStore) loadRuleGroupsConcurrently(ctx context.Context, rgObjects [] return result, err } -func generateRuleObjectKey(id, namespace, name string) string { - if id == "" { +func generateRuleObjectKey(userID, namespace, groupName string) string { + if userID == "" { return rulePrefix } - prefix := rulePrefix + id + "/" + prefix := rulePrefix + userID + delim if namespace == "" { return prefix } - ns := base64.URLEncoding.EncodeToString([]byte(namespace)) + "/" - if name == "" { + ns := base64.URLEncoding.EncodeToString([]byte(namespace)) + delim + if groupName == "" { return prefix + ns } - return prefix + ns + base64.URLEncoding.EncodeToString([]byte(name)) + return prefix + ns + base64.URLEncoding.EncodeToString([]byte(groupName)) } func decomposeRuleObjectKey(handle string) string { diff --git a/pkg/ruler/rules/store.go b/pkg/ruler/rules/store.go index 143ce08aa9a..1884e46e0e3 100644 --- a/pkg/ruler/rules/store.go +++ b/pkg/ruler/rules/store.go @@ -21,8 +21,14 @@ var ( // RuleStore is used to store and retrieve rules type RuleStore interface { - ListAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) - ListRuleGroups(ctx context.Context, userID string, namespace string) (RuleGroupList, error) + ListAllUsers(ctx context.Context) ([]string, error) + + // Returns all rule groups, and loads rules for each group. + LoadAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) + + // LoadRuleGroupsForUserAndNamespace returns all the active rule groups for a user from given namespace. + // If namespace is empty, groups from all namespaces are returned. + LoadRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (RuleGroupList, error) GetRuleGroup(ctx context.Context, userID, namespace, group string) (*RuleGroupDesc, error) SetRuleGroup(ctx context.Context, userID, namespace string, group *RuleGroupDesc) error DeleteRuleGroup(ctx context.Context, userID, namespace string, group string) error @@ -63,9 +69,20 @@ func NewConfigRuleStore(c client.Client) *ConfigRuleStore { } } -// ListAllRuleGroups implements RuleStore -func (c *ConfigRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) { +func (c *ConfigRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { + m, err := c.LoadAllRuleGroups(ctx) + + // TODO: this should be optimized, if possible. + result := []string(nil) + for u := range m { + result = append(result, u) + } + + return result, err +} +// LoadAllRuleGroups implements RuleStore +func (c *ConfigRuleStore) LoadAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) { configs, err := c.configClient.GetRules(ctx, c.since) if err != nil { @@ -90,10 +107,6 @@ func (c *ConfigRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]Rul c.ruleGroupList[user] = userRules } - if err != nil { - return nil, err - } - c.since = getLatestConfigID(configs, c.since) return c.ruleGroupList, nil @@ -111,9 +124,26 @@ func getLatestConfigID(cfgs map[string]userconfig.VersionedRulesConfig, latest u return ret } -// ListRuleGroups is not implemented -func (c *ConfigRuleStore) ListRuleGroups(ctx context.Context, userID string, namespace string) (RuleGroupList, error) { - return nil, errors.New("not implemented by the config service rule store") +func (c *ConfigRuleStore) LoadRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (RuleGroupList, error) { + r, err := c.LoadAllRuleGroups(ctx) + if err != nil { + return nil, err + } + + if namespace == "" { + return r[userID], nil + } + + list := r[userID] + for ix := 0; ix < len(list); { + if list[ix].GetNamespace() != namespace { + list = append(list[:ix], list[ix+1:]...) + } else { + ix++ + } + } + + return list, nil } // GetRuleGroup is not implemented diff --git a/pkg/ruler/rules/store_test.go b/pkg/ruler/rules/store_test.go index 775cdab212a..c5f7f9b3579 100644 --- a/pkg/ruler/rules/store_test.go +++ b/pkg/ruler/rules/store_test.go @@ -35,7 +35,7 @@ func Test_ConfigRuleStoreError(t *testing.T) { } store := NewConfigRuleStore(mock) - _, err := store.ListAllRuleGroups(context.Background()) + _, err := store.LoadAllRuleGroups(context.Background()) assert.Equal(t, mock.err, err, "Unexpected error returned") } @@ -54,7 +54,7 @@ func Test_ConfigRuleStoreReturn(t *testing.T) { } store := NewConfigRuleStore(mock) - rules, _ := store.ListAllRuleGroups(context.Background()) + rules, _ := store.LoadAllRuleGroups(context.Background()) assert.Equal(t, 1, len(rules["user"])) assert.Equal(t, id, store.since) @@ -73,7 +73,7 @@ func Test_ConfigRuleStoreDelete(t *testing.T) { } store := NewConfigRuleStore(mock) - _, _ = store.ListAllRuleGroups(context.Background()) + _, _ = store.LoadAllRuleGroups(context.Background()) mock.cfgs["user"] = userconfig.VersionedRulesConfig{ ID: 1, @@ -81,7 +81,7 @@ func Test_ConfigRuleStoreDelete(t *testing.T) { DeletedAt: time.Unix(0, 1), } - rules, _ := store.ListAllRuleGroups(context.Background()) + rules, _ := store.LoadAllRuleGroups(context.Background()) assert.Equal(t, 0, len(rules["user"])) } @@ -99,7 +99,7 @@ func Test_ConfigRuleStoreAppend(t *testing.T) { } store := NewConfigRuleStore(mock) - _, _ = store.ListAllRuleGroups(context.Background()) + _, _ = store.LoadAllRuleGroups(context.Background()) delete(mock.cfgs, "user") mock.cfgs["user2"] = userconfig.VersionedRulesConfig{ @@ -108,7 +108,7 @@ func Test_ConfigRuleStoreAppend(t *testing.T) { DeletedAt: zeroTime, } - rules, _ := store.ListAllRuleGroups(context.Background()) + rules, _ := store.LoadAllRuleGroups(context.Background()) assert.Equal(t, 2, len(rules)) } @@ -136,7 +136,7 @@ func Test_ConfigRuleStoreSinceSet(t *testing.T) { } store := NewConfigRuleStore(mock) - _, _ = store.ListAllRuleGroups(context.Background()) + _, _ = store.LoadAllRuleGroups(context.Background()) assert.Equal(t, userconfig.ID(100), store.since) delete(mock.cfgs, "user") @@ -147,7 +147,7 @@ func Test_ConfigRuleStoreSinceSet(t *testing.T) { DeletedAt: zeroTime, } - _, _ = store.ListAllRuleGroups(context.Background()) + _, _ = store.LoadAllRuleGroups(context.Background()) assert.Equal(t, userconfig.ID(100), store.since) mock.cfgs["user2"] = userconfig.VersionedRulesConfig{ @@ -156,7 +156,7 @@ func Test_ConfigRuleStoreSinceSet(t *testing.T) { DeletedAt: zeroTime, } - _, _ = store.ListAllRuleGroups(context.Background()) + _, _ = store.LoadAllRuleGroups(context.Background()) assert.Equal(t, userconfig.ID(101), store.since) } diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index ab01f0aba06..8da014c4613 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -115,21 +115,30 @@ func newMockRuleStore(rules map[string]rules.RuleGroupList) *mockRuleStore { } } -func (m *mockRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { +func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { m.mtx.Lock() defer m.mtx.Unlock() - copy := make(map[string]rules.RuleGroupList) + var result []string + for u := range m.rules { + result = append(result, u) + } + return result, nil +} + +func (m *mockRuleStore) LoadAllRuleGroups(_ context.Context) (map[string]rules.RuleGroupList, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + result := make(map[string]rules.RuleGroupList) for k, v := range m.rules { - rgl := make(rules.RuleGroupList, 0, len(v)) - rgl = append(rgl, v...) - copy[k] = rgl + result[k] = append(rules.RuleGroupList(nil), v...) } - return copy, nil + return result, nil } -func (m *mockRuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { +func (m *mockRuleStore) LoadRuleGroupsForUserAndNamespace(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock() diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 3b13764aec8..3e50b89a79c 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -135,7 +135,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf logger: logger, bucketSync: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_storegateway_bucket_sync_total", - Help: "Total number of times the bucket sync operation trigged.", + Help: "Total number of times the bucket sync operation triggered.", }, []string{"reason"}), } @@ -289,7 +289,7 @@ func (g *StoreGateway) running(ctx context.Context) error { // replication set which we use to compare with the previous state. currRingState, _ := g.ring.GetAll(ring.BlocksSync) // nolint:errcheck - if hasRingTopologyChanged(ringLastState, currRingState) { + if ring.HasReplicationSetChanged(ringLastState, currRingState) { ringLastState = currRingState g.syncStores(ctx, syncReasonRingChange) } diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 1b5c89e606a..5eb55475676 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "os" - "sort" "time" "github.com/go-kit/kit/log/level" @@ -114,30 +113,3 @@ func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) NumTokens: RingNumTokens, }, nil } - -func hasRingTopologyChanged(before, after ring.ReplicationSet) bool { - beforeInstances := before.Ingesters - afterInstances := after.Ingesters - - if len(beforeInstances) != len(afterInstances) { - return true - } - - sort.Sort(ring.ByAddr(beforeInstances)) - sort.Sort(ring.ByAddr(afterInstances)) - - for i := 0; i < len(beforeInstances); i++ { - b := beforeInstances[i] - a := afterInstances[i] - - // Exclude the heartbeat timestamp from the comparison. - b.Timestamp = 0 - a.Timestamp = 0 - - if !b.Equal(a) { - return true - } - } - - return false -} diff --git a/pkg/util/shard.go b/pkg/util/shard.go new file mode 100644 index 00000000000..86759dc4aae --- /dev/null +++ b/pkg/util/shard.go @@ -0,0 +1,17 @@ +package util + +import ( + "crypto/md5" + "encoding/binary" +) + +// ShuffleShardSeed returns seed for random number generator, computed from provided identifier. +func ShuffleShardSeed(identifier string) int64 { + // Use the identifier to compute an hash we'll use to seed the random. + hasher := md5.New() + hasher.Write([]byte(identifier)) // nolint:errcheck + checksum := hasher.Sum(nil) + + // Generate the seed based on the first 64 bits of the checksum. + return int64(binary.BigEndian.Uint64(checksum)) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2a95756b3d7..c546d1d1148 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -72,6 +72,7 @@ type Limits struct { // Ruler defaults and limits. RulerEvaluationDelay time.Duration `yaml:"ruler_evaluation_delay_duration"` + RulerTenantShardSize int `yaml:"ruler_tenant_shard_size"` // Store-gateway. StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size"` @@ -122,6 +123,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends). This option only works with queriers connecting to the query-frontend, not when using downstream URL.") f.DurationVar(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", 0, "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") + f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]") f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]") @@ -370,6 +372,11 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration { return o.getOverridesForUser(userID).RulerEvaluationDelay } +// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. +func (o *Overrides) RulerTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).RulerTenantShardSize +} + // StoreGatewayTenantShardSize returns the store-gateway shard size for a given user. func (o *Overrides) StoreGatewayTenantShardSize(userID string) int { return o.getOverridesForUser(userID).StoreGatewayTenantShardSize