Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [FEATURE] Shuffle sharding: added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Shuffle sharding: added support for shuffle-sharding ingesters on the read path. When ingesters shuffle-sharding is enabled and `-querier.shuffle-sharding-ingesters-lookback-period` is set, queriers will fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. #3252
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
* [ENHANCEMENT] Ruler: Introduces two new limits `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` to control the number of rules per rule group and the total number of rule groups for a given user. They are disabled by default. #3366
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
- `-blocks-storage.s3.http.idle-conn-timeout`
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2903,6 +2903,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ruler.tenant-shard-size
[ruler_tenant_shard_size: <int> | default = 0]

# Maximum number of rules per rule group per-tenant. 0 to disable.
# CLI flag: -ruler.max-rules-per-rule-group
[ruler_max_rules_per_rule_group: <int> | default = 0]

# Maximum number of rule groups per-tenant. 0 to disable.
# CLI flag: -ruler.max-rule-groups-per-tenant
[ruler_max_rule_groups_per_tenant: <int> | default = 0]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down
19 changes: 19 additions & 0 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,25 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {
return
}

if err := a.ruler.AssertMaxRulesPerRuleGroup(userID, len(rg.Rules)); err != nil {
level.Error(logger).Log("msg", "limit validation failure", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

rgs, err := a.store.ListRuleGroupsForUserAndNamespace(req.Context(), userID, "")
if err != nil {
level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if err := a.ruler.AssertMaxRuleGroups(userID, len(rgs)); err != nil {
level.Error(logger).Log("msg", "limit validation failure", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

rgProto := store.ToProto(userID, namespace, rg)

level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String())
Expand Down
67 changes: 67 additions & 0 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,73 @@ func TestRuler_DeleteNamespace(t *testing.T) {
require.Equal(t, "{\"status\":\"error\",\"data\":null,\"errorType\":\"server_error\",\"error\":\"unable to delete rg\"}", w.Body.String())
}

func TestRuler_Limits(t *testing.T) {
cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rules.RuleGroupList)))
defer cleanup()

r, rcleanup := newTestRuler(t, cfg)
defer rcleanup()
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1}

a := NewAPI(r, r.store)

tc := []struct {
name string
input string
output string
err error
status int
}{
{
name: "when exceeding the rules per rule group limit",
status: 400,
input: `
name: test
interval: 15s
rules:
- record: up_rule
expr: up{}
- alert: up_alert
expr: sum(up{}) > 1
for: 30s
annotations:
test: test
labels:
test: test
`,
output: "per-user rules per rule group limit (limit: 1 actual: 2) exceeded\n",
},
{
name: "when exceeding the rule group limit",
status: 400,
input: `
name: test
interval: 15s
rules:
- record: up_rule
expr: up{}
`,
output: "per-user rules per rule group limit (limit: 1 actual: 1) exceeded\n",
},
}

for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
router := mux.NewRouter()
router.Path("/api/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
// POST
req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), "user1")
w := httptest.NewRecorder()

router.ServeHTTP(w, req)
require.Equal(t, tt.status, w.Code)
require.Equal(t, tt.output, w.Body.String())
})
}
}

func requestFor(t *testing.T, method string, url string, body io.Reader, userID string) *http.Request {
t.Helper()

Expand Down
2 changes: 2 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
type RulesLimits interface {
EvaluationDelay(userID string) time.Duration
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
}

// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
Expand Down
35 changes: 35 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
rulerSyncReasonInitial = "initial"
rulerSyncReasonPeriodic = "periodic"
rulerSyncReasonRingChange = "ring-change"

// Limit errors
errMaxRuleGroupsPerUserLimitExceeded = "per-user rule groups limit (limit: %d actual: %d) exceeded"
errMaxRulesPerRuleGroupPerUserLimitExceeded = "per-user rules per rule group limit (limit: %d actual: %d) exceeded"
)

// Config is the configuration for the recording rules server.
Expand Down Expand Up @@ -736,3 +740,34 @@ func (r *Ruler) Rules(ctx context.Context, in *RulesRequest) (*RulesResponse, er

return &RulesResponse{Groups: groupDescs}, nil
}

// AssertMaxRuleGroups limit has not been reached compared to the current
// number of total rule groups in input and returns an error if so.
func (r *Ruler) AssertMaxRuleGroups(userID string, rg int) error {
limit := r.limits.RulerMaxRuleGroupsPerTenant(userID)

if limit <= 0 {
return nil
}

if rg < limit {
return nil
}

return fmt.Errorf(errMaxRuleGroupsPerUserLimitExceeded, limit, rg)
}

// AssertMaxRulesPerRuleGroup limit has not been reached compared to the current
// number of rules in a rule group in input and returns an error if so.
func (r *Ruler) AssertMaxRulesPerRuleGroup(userID string, rules int) error {
limit := r.limits.RulerMaxRulesPerRuleGroup(userID)

if limit <= 0 {
return nil
}

if rules < limit {
return nil
}
return fmt.Errorf(errMaxRulesPerRuleGroupPerUserLimitExceeded, limit, rules)
}
16 changes: 13 additions & 3 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ func defaultRulerConfig(store rules.RuleStore) (Config, func()) {
}

type ruleLimits struct {
evalDelay time.Duration
tenantShard int
evalDelay time.Duration
tenantShard int
maxRulesPerRuleGroup int
maxRuleGroups int
}

func (r ruleLimits) EvaluationDelay(_ string) time.Duration {
Expand All @@ -75,6 +77,14 @@ func (r ruleLimits) RulerTenantShardSize(_ string) int {
return r.tenantShard
}

func (r ruleLimits) RulerMaxRuleGroupsPerTenant(_ string) int {
return r.maxRuleGroups
}

func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int {
return r.maxRulesPerRuleGroup
}

func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, func()) {
dir, err := ioutil.TempDir("", filepath.Base(t.Name()))
testutil.Ok(t, err)
Expand All @@ -101,7 +111,7 @@ func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc,
l := log.NewLogfmtLogger(os.Stdout)
l = level.NewFilter(l, level.AllowInfo())

return engine, noopQueryable, pusher, l, ruleLimits{evalDelay: 0}, cleanup
return engine, noopQueryable, pusher, l, ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, cleanup
}

func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, use

userRules, exists := m.rules[userID]
if !exists {
return nil, rules.ErrUserNotFound
return rules.RuleGroupList{}, nil
}

if namespace == "" {
Expand All @@ -160,7 +160,7 @@ func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, use
}

if len(namespaceRules) == 0 {
return nil, rules.ErrGroupNamespaceNotFound
return rules.RuleGroupList{}, nil
}

return namespaceRules, nil
Expand Down
18 changes: 16 additions & 2 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ type Limits struct {
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant"`

// Ruler defaults and limits.
RulerEvaluationDelay time.Duration `yaml:"ruler_evaluation_delay_duration"`
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size"`
RulerEvaluationDelay time.Duration `yaml:"ruler_evaluation_delay_duration"`
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant"`

// Store-gateway.
StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size"`
Expand Down Expand Up @@ -124,6 +126,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.DurationVar(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", 0, "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.")
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.")
f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]")
Expand Down Expand Up @@ -377,6 +381,16 @@ func (o *Overrides) RulerTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).RulerTenantShardSize
}

// RulerMaxRulesPerRuleGroup returns the maximum number of rules per rule group for a given user.
func (o *Overrides) RulerMaxRulesPerRuleGroup(userID string) int {
return o.getOverridesForUser(userID).RulerMaxRulesPerRuleGroup
}

// RulerMaxRuleGroupsPerTenant returns the maximum number of rule groups for a given user.
func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
return o.getOverridesForUser(userID).RulerMaxRuleGroupsPerTenant
}

// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).StoreGatewayTenantShardSize
Expand Down