diff --git a/CHANGELOG.md b/CHANGELOG.md index 26c9de225bc..a114eb8bff4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040 * [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992. * [ENHANCEMENT] Cortex is now built with Go 1.16. #4062 +* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074 * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 90ab93ec077..45fda0f8d5f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1584,6 +1584,18 @@ ring: # Enable the ruler api # CLI flag: -experimental.ruler.enable-api [enable_api: | default = false] + +# Comma separated list of tenants whose rules this ruler can evaluate. If +# specified, only these tenants will be handled by ruler, otherwise this ruler +# can process rules from all tenants. Subject to sharding. +# CLI flag: -ruler.enabled-tenants +[enabled_tenants: | default = ""] + +# Comma separated list of tenants whose rules this ruler cannot evaluate. If +# specified, a ruler that would normally pick the specified tenant(s) for +# processing will ignore them instead. Subject to sharding. +# CLI flag: -ruler.disabled-tenants +[disabled_tenants: | default = ""] ``` ### `ruler_storage_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index dbe88933250..88f8121904c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -172,18 +172,13 @@ type ConfigProvider interface { type Compactor struct { services.Service - compactorCfg Config - storageCfg cortex_tsdb.BlocksStorageConfig - cfgProvider ConfigProvider - logger log.Logger - parentLogger log.Logger - registerer prometheus.Registerer - - // If empty, all users are enabled. If not empty, only users in the map are enabled (possibly owned by compactor, also subject to sharding configuration). - enabledUsers map[string]struct{} - - // If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor). - disabledUsers map[string]struct{} + compactorCfg Config + storageCfg cortex_tsdb.BlocksStorageConfig + cfgProvider ConfigProvider + logger log.Logger + parentLogger log.Logger + registerer prometheus.Registerer + allowedTenants *util.AllowedTenants // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. @@ -272,6 +267,7 @@ func newCompactor( bucketClientFactory: bucketClientFactory, blocksGrouperFactory: blocksGrouperFactory, blocksCompactorFactory: blocksCompactorFactory, + allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", @@ -321,21 +317,10 @@ func newCompactor( } if len(compactorCfg.EnabledTenants) > 0 { - c.enabledUsers = map[string]struct{}{} - for _, u := range compactorCfg.EnabledTenants { - c.enabledUsers[u] = struct{}{} - } - - level.Info(c.logger).Log("msg", "using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", ")) + level.Info(c.logger).Log("msg", "compactor using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", ")) } - if len(compactorCfg.DisabledTenants) > 0 { - c.disabledUsers = map[string]struct{}{} - for _, u := range compactorCfg.DisabledTenants { - c.disabledUsers[u] = struct{}{} - } - - level.Info(c.logger).Log("msg", "using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", ")) + level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", ")) } c.Service = services.NewBasicService(c.starting, c.running, c.stopping) @@ -711,7 +696,7 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) { } func (c *Compactor) ownUser(userID string) (bool, error) { - if !isAllowedUser(c.enabledUsers, c.disabledUsers, userID) { + if !c.allowedTenants.IsAllowed(userID) { return false, nil } @@ -738,22 +723,6 @@ func (c *Compactor) ownUser(userID string) (bool, error) { return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil } -func isAllowedUser(enabledUsers, disabledUsers map[string]struct{}, userID string) bool { - if len(enabledUsers) > 0 { - if _, ok := enabledUsers[userID]; !ok { - return false - } - } - - if len(disabledUsers) > 0 { - if _, ok := disabledUsers[userID]; ok { - return false - } - } - - return true -} - const compactorMetaPrefix = "compactor-meta-" // metaSyncDirForUser returns directory to store cached meta files. diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 525bc288f04..9d3c53c57e9 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1152,48 +1152,6 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string { return string(content) } -func TestAllowedUser(t *testing.T) { - testCases := map[string]struct { - enabled, disabled map[string]struct{} - user string - expected bool - }{ - "no enabled or disabled": { - user: "test", - expected: true, - }, - - "only enabled, enabled": { - enabled: map[string]struct{}{"user": {}}, - user: "user", - expected: true, - }, - - "only enabled, disabled": { - enabled: map[string]struct{}{"user": {}}, - user: "not user", - expected: false, - }, - - "only disabled, disabled": { - disabled: map[string]struct{}{"user": {}}, - user: "user", - expected: false, - }, - - "only disabled, enabled": { - disabled: map[string]struct{}{"user": {}}, - user: "not user", - expected: true, - }, - } - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - require.Equal(t, tc.expected, isAllowedUser(tc.enabled, tc.disabled, tc.user)) - }) - } -} - func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { numUsers := 10 diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 0d1c7f203d0..aba2f01d8d1 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -107,6 +107,9 @@ type Config struct { EnableAPI bool `yaml:"enable_api"` + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + RingCheckPeriod time.Duration `yaml:"-"` } @@ -163,6 +166,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`) f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`) + f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.") + f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.") + cfg.RingCheckPeriod = 5 * time.Second } @@ -224,6 +230,8 @@ type Ruler struct { ringCheckErrors prometheus.Counter rulerSync *prometheus.CounterVec + allowedTenants *util.AllowedTenants + registry prometheus.Registerer logger log.Logger } @@ -231,13 +239,14 @@ type Ruler struct { // NewRuler creates a new ruler from a distributor and chunk store. func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { ruler := &Ruler{ - cfg: cfg, - store: ruleStore, - manager: manager, - registry: reg, - logger: logger, - limits: limits, - clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg), + cfg: cfg, + store: ruleStore, + manager: manager, + registry: reg, + logger: logger, + limits: limits, + clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg), + allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", @@ -250,6 +259,13 @@ func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, }, []string{"reason"}), } + if len(cfg.EnabledTenants) > 0 { + level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", ")) + } + if len(cfg.DisabledTenants) > 0 { + level.Info(ruler.logger).Log("msg", "ruler using disabled users", "disabled", strings.Join(cfg.DisabledTenants, ", ")) + } + if cfg.EnableSharding { ringStore, err := kv.NewClient( cfg.Ring.KVStore, @@ -472,20 +488,32 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { r.manager.SyncRuleGroups(ctx, configs) } -func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) { switch { case !r.cfg.EnableSharding: - return r.listRulesNoSharding(ctx) + result, err = r.listRulesNoSharding(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyDefault: - return r.listRulesShardingDefault(ctx) + result, err = r.listRulesShardingDefault(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle: - return r.listRulesShuffleSharding(ctx) + result, err = r.listRulesShuffleSharding(ctx) default: return nil, errors.New("invalid sharding configuration") } + + if err != nil { + return + } + + for userID := range result { + if !r.allowedTenants.IsAllowed(userID) { + level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) + delete(result, userID) + } + } + return } func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index af27d948c9c..1426d005392 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -277,6 +277,8 @@ func TestSharding(t *testing.T) { shardingStrategy string shuffleShardSize int setupRing func(*ring.Desc) + enabledUsers []string + disabledUsers []string expectedRules expectedRulesMap } @@ -304,6 +306,23 @@ func TestSharding(t *testing.T) { expectedRules: expectedRulesMap{ruler1: allRules}, }, + "no sharding, single user allowed": { + sharding: false, + enabledUsers: []string{user1}, + expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1, user1Group2}, + }}, + }, + + "no sharding, single user disabled": { + sharding: false, + disabledUsers: []string{user1}, + expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }}, + }, + "default sharding, single ruler": { sharding: true, shardingStrategy: util.ShardingStrategyDefault, @@ -313,6 +332,31 @@ func TestSharding(t *testing.T) { expectedRules: expectedRulesMap{ruler1: allRules}, }, + "default sharding, single ruler, single enabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + }, + expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1, user1Group2}, + }}, + }, + + "default sharding, single ruler, single disabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) + }, + expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }}, + }, + "default sharding, multiple ACTIVE rulers": { sharding: true, shardingStrategy: util.ShardingStrategyDefault, @@ -334,6 +378,46 @@ func TestSharding(t *testing.T) { }, }, + "default sharding, multiple ACTIVE rulers, single enabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + }, + }, + + "default sharding, multiple ACTIVE rulers, single disabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user2: {user2Group1}, + }, + + ruler2: map[string]rulespb.RuleGroupList{ + user3: {user3Group1}, + }, + }, + }, + "default sharding, unhealthy ACTIVE ruler": { sharding: true, shardingStrategy: util.ShardingStrategyDefault, @@ -503,6 +587,51 @@ func TestSharding(t *testing.T) { }, }, }, + + "shuffle sharding, three rulers, shard size 2, single enabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + enabledUsers: []string{user1}, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + ruler3: map[string]rulespb.RuleGroupList{}, + }, + }, + + "shuffle sharding, three rulers, shard size 2, single disabled user": { + sharding: true, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + disabledUsers: []string{user1}, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{}, + ruler2: map[string]rulespb.RuleGroupList{}, + ruler3: map[string]rulespb.RuleGroupList{ + user2: {user2Group1}, + user3: {user3Group1}, + }, + }, + }, } for name, tc := range testCases { @@ -524,6 +653,8 @@ func TestSharding(t *testing.T) { HeartbeatTimeout: 1 * time.Minute, }, FlushCheckPeriod: 0, + EnabledTenants: tc.enabledUsers, + DisabledTenants: tc.disabledUsers, } r, cleanup := newRuler(t, cfg) diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go new file mode 100644 index 00000000000..0f610a5fdb1 --- /dev/null +++ b/pkg/util/allowed_tenants.go @@ -0,0 +1,45 @@ +package util + +type AllowedTenants struct { + // If empty, all tenants are enabled. If not empty, only tenants in the map are enabled. + enabled map[string]struct{} + + // If empty, no tenants are disabled. If not empty, tenants in the map are disabled. + disabled map[string]struct{} +} + +func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { + a := &AllowedTenants{} + + if len(enabled) > 0 { + a.enabled = make(map[string]struct{}, len(enabled)) + for _, u := range enabled { + a.enabled[u] = struct{}{} + } + } + + if len(disabled) > 0 { + a.disabled = make(map[string]struct{}, len(disabled)) + for _, u := range disabled { + a.disabled[u] = struct{}{} + } + } + + return a +} + +func (a *AllowedTenants) IsAllowed(tenantID string) bool { + if len(a.enabled) > 0 { + if _, ok := a.enabled[tenantID]; !ok { + return false + } + } + + if len(a.disabled) > 0 { + if _, ok := a.disabled[tenantID]; ok { + return false + } + } + + return true +} diff --git a/pkg/util/allowed_tenants_test.go b/pkg/util/allowed_tenants_test.go new file mode 100644 index 00000000000..b51cf930e74 --- /dev/null +++ b/pkg/util/allowed_tenants_test.go @@ -0,0 +1,38 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAllowedTenants_NoConfig(t *testing.T) { + a := NewAllowedTenants(nil, nil) + require.True(t, a.IsAllowed("all")) + require.True(t, a.IsAllowed("tenants")) + require.True(t, a.IsAllowed("allowed")) +} + +func TestAllowedTenants_Enabled(t *testing.T) { + a := NewAllowedTenants([]string{"A", "B"}, nil) + require.True(t, a.IsAllowed("A")) + require.True(t, a.IsAllowed("B")) + require.False(t, a.IsAllowed("C")) + require.False(t, a.IsAllowed("D")) +} + +func TestAllowedTenants_Disabled(t *testing.T) { + a := NewAllowedTenants(nil, []string{"A", "B"}) + require.False(t, a.IsAllowed("A")) + require.False(t, a.IsAllowed("B")) + require.True(t, a.IsAllowed("C")) + require.True(t, a.IsAllowed("D")) +} + +func TestAllowedTenants_Combination(t *testing.T) { + a := NewAllowedTenants([]string{"A", "B"}, []string{"B", "C"}) + require.True(t, a.IsAllowed("A")) // enabled, and not disabled + require.False(t, a.IsAllowed("B")) // enabled, but also disabled + require.False(t, a.IsAllowed("C")) // disabled + require.False(t, a.IsAllowed("D")) // not enabled +}