diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 4d7574cfebb..e3396cf862f 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -64,6 +64,8 @@ type DefaultMultiTenantManager struct { ruleCache map[string][]*promRules.Group ruleCacheMtx sync.RWMutex syncRuleMtx sync.Mutex + + ruleGroupIterationFunc promRules.GroupEvalIterationFunc } func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { @@ -122,8 +124,9 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory Name: "ruler_config_updates_total", Help: "Total number of config updates triggered by a user", }, []string{"user"}), - registry: reg, - logger: logger, + registry: reg, + logger: logger, + ruleGroupIterationFunc: defaultRuleGroupIterationFunc, } if cfg.RulesBackupEnabled() { m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg) @@ -131,6 +134,15 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory return m, nil } +func NewDefaultMultiTenantManagerWithIterationFunc(iterFunc promRules.GroupEvalIterationFunc, cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { + manager, err := NewDefaultMultiTenantManager(cfg, limits, managerFactory, evalMetrics, reg, logger) + if err != nil { + return nil, err + } + manager.ruleGroupIterationFunc = iterFunc + return manager, nil +} + func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { // this is a safety lock to ensure this method is executed sequentially r.syncRuleMtx.Lock() @@ -214,7 +226,7 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user if (rulesUpdated || externalLabelsUpdated) && existing { r.updateRuleCache(user, manager.RuleGroups()) } - err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc) + err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), r.ruleGroupIterationFunc) r.deleteRuleCache(user) if err != nil { r.lastReloadSuccessful.WithLabelValues(user).Set(0) @@ -257,7 +269,7 @@ func (r *DefaultMultiTenantManager) createRulesManager(user string, ctx context. return manager } -func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) { +func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) { logMessage := []interface{}{ "component", "ruler", "rule_group", g.Name(), diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index ec7eb287c30..61757377027 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -340,6 +340,26 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes return ruler, manager } +func buildRulerWithIterFunc(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler, ruleGroupIterFunc promRules.GroupEvalIterationFunc) (*Ruler, *DefaultMultiTenantManager) { + engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig) + metrics := NewRuleEvalMetrics(rulerConfig, reg) + managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, metrics, reg) + manager, err := NewDefaultMultiTenantManagerWithIterationFunc(ruleGroupIterFunc, rulerConfig, &ruleLimits{}, managerFactory, metrics, reg, log.NewNopLogger()) + require.NoError(t, err) + + ruler, err := newRuler( + rulerConfig, + manager, + reg, + logger, + store, + overrides, + newMockClientsPool(rulerConfig, logger, reg, rulerAddrMap), + ) + require.NoError(t, err) + return ruler, manager +} + func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler { ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) @@ -2776,8 +2796,10 @@ func TestRecoverAlertsPostOutage(t *testing.T) { querier.UseAlwaysQueryable(newEmptyQueryable()), } - // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. - r, _ := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil) + // Define a no-op GroupEvalIterationFunc to avoid races between the scheduled Eval() execution and the evaluations invoked by this test. + evalFunc := func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {} + + r, _ := buildRulerWithIterFunc(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil, evalFunc) r.syncRules(context.Background(), rulerSyncReasonInitial) // assert initial state of rule group