diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b5b191a975..59d89a2bcc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Alertmanager now removes local files after Alertmanager is no longer running for removed or resharded user. #3910 * [CHANGE] Alertmanager now stores local files in per-tenant folders. Files stored by Alertmanager previously are migrated to new hierarchy. Support for this migration will be removed in Cortex 1.10. #3910 +* [FEATURE] Ruler Storage: Added `local` backend support to the ruler storage configuration under the `-ruler-storage.` flag prefix. #3932 * [ENHANCEMENT] Ruler: optimized `/api/v1/rules` and `/api/v1/alerts` when ruler sharding is enabled. #3916 * [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916 * `cortex_ruler_clients` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 9c18e643623..e4b3cad46f2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -155,7 +155,7 @@ tenant_federation: ruler_storage: # Backend storage to use. Supported backends are: s3, gcs, azure, swift, - # filesystem, configdb. + # filesystem, configdb, local. # CLI flag: -ruler-storage.backend [backend: | default = "s3"] @@ -356,6 +356,11 @@ ruler_storage: # The CLI flags prefix for this block config is: ruler-storage [configdb: ] + local: + # Directory to scan for rules + # CLI flag: -ruler-storage.local.directory + [directory: | default = ""] + # The configs_config configures the Cortex Configs DB and API. [configs: ] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5f4d927775d..d4254ad4c60 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -642,7 +642,7 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) { if !t.Cfg.Ruler.StoreConfig.IsDefaults() { t.RulerStorage, err = ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, rules.FileLoader{}, util_log.Logger) } else { - t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, util_log.Logger, prometheus.DefaultRegisterer) } return } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index fdf6726169a..05be8ff1193 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -21,7 +21,7 @@ import ( "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/cortexpb" - store "github.com/cortexproject/cortex/pkg/ruler/rulespb" + "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -405,7 +405,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) { return } - err = a.store.LoadRuleGroups(req.Context(), map[string]rulestore.RuleGroupList{userID: rgs}) + err = a.store.LoadRuleGroups(req.Context(), map[string]rulespb.RuleGroupList{userID: rgs}) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -435,7 +435,7 @@ func (a *API) GetRuleGroup(w http.ResponseWriter, req *http.Request) { return } - formatted := store.FromProto(rg) + formatted := rulespb.FromProto(rg) marshalAndSend(formatted, w, logger) } @@ -495,7 +495,7 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) { return } - rgProto := store.ToProto(userID, namespace, rg) + rgProto := rulespb.ToProto(userID, namespace, rg) level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String()) err = a.store.SetRuleGroup(req.Context(), userID, namespace, rgProto) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 8752f4f1009..03da76f563a 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/ruler/rulestore" + "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -171,7 +171,7 @@ func TestRuler_alerts(t *testing.T) { } func TestRuler_Create(t *testing.T) { - cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList))) + cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() r, rcleanup := newTestRuler(t, cfg) @@ -301,7 +301,7 @@ func TestRuler_DeleteNamespace(t *testing.T) { } func TestRuler_Limits(t *testing.T) { - cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulestore.RuleGroupList))) + cfg, cleanup := defaultRulerConfig(newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() r, rcleanup := newTestRuler(t, cfg) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index c901a33295f..83e6818bf6e 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -19,7 +19,7 @@ import ( "github.com/weaveworks/common/user" "golang.org/x/net/context/ctxhttp" - "github.com/cortexproject/cortex/pkg/ruler/rulestore" + "github.com/cortexproject/cortex/pkg/ruler/rulespb" ) type DefaultMultiTenantManager struct { @@ -91,7 +91,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg }, nil } -func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList) { +func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { // A lock is taken to ensure if this function is called concurrently, then each call // returns after the call map files and check for updates r.userManagerMtx.Lock() @@ -121,7 +121,7 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou // syncRulesToManager maps the rule files to disk, detects any changes and will create/update the // the users Prometheus Rules Manager. -func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulestore.RuleGroupList) { +func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) { // Map the files to disk and return the file names to be passed to the users manager if they // have been updated update, files, err := r.mapper.MapRules(user, groups.Formatted()) diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index c1afedbcee9..785fa756c40 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -16,7 +16,6 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ruler/rulespb" - "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -32,7 +31,7 @@ func TestSyncRuleGroups(t *testing.T) { const user = "testUser" - userRules := map[string]rulestore.RuleGroupList{ + userRules := map[string]rulespb.RuleGroupList{ user: { &rulespb.RuleGroupDesc{ Name: "group1", diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 92d128dbeac..a4fcd20c50a 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -170,7 +170,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type MultiTenantManager interface { // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. - SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulestore.RuleGroupList) + SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). GetRules(userID string) []*promRules.Group // Stop stops all Manager components. @@ -470,7 +470,7 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { r.manager.SyncRuleGroups(ctx, configs) } -func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (r *Ruler) listRules(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { switch { case !r.cfg.EnableSharding: return r.listRulesNoSharding(ctx) @@ -486,17 +486,17 @@ func (r *Ruler) listRules(ctx context.Context) (map[string]rulestore.RuleGroupLi } } -func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { return r.store.ListAllRuleGroups(ctx) } -func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { configs, err := r.store.ListAllRuleGroups(ctx) if err != nil { return nil, err } - filteredConfigs := make(map[string]rulestore.RuleGroupList) + filteredConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) if len(filtered) > 0 { @@ -506,7 +506,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulest return filteredConfigs, nil } -func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { users, err := r.store.ListAllUsers(ctx) if err != nil { return nil, errors.Wrap(err, "unable to list users of ruler") @@ -540,7 +540,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulest close(userCh) mu := sync.Mutex{} - result := map[string]rulestore.RuleGroupList{} + result := map[string]rulespb.RuleGroupList{} concurrency := loadRulesConcurrency if len(userRings) < concurrency { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 937de887314..0bcd6524f13 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -261,15 +261,15 @@ func TestSharding(t *testing.T) { user2Group1Token := tokenForGroup(user2Group1) user3Group1Token := tokenForGroup(user3Group1) - noRules := map[string]rulestore.RuleGroupList{} - allRules := map[string]rulestore.RuleGroupList{ + noRules := map[string]rulespb.RuleGroupList{} + allRules := map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, user2: {user2Group1}, user3: {user3Group1}, } // ruler ID -> (user ID -> list of groups). - type expectedRulesMap map[string]map[string]rulestore.RuleGroupList + type expectedRulesMap map[string]map[string]rulespb.RuleGroupList type testCase struct { sharding bool @@ -321,12 +321,12 @@ func TestSharding(t *testing.T) { }, expectedRules: expectedRulesMap{ - ruler1: map[string]rulestore.RuleGroupList{ + ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1}, user2: {user2Group1}, }, - ruler2: map[string]rulestore.RuleGroupList{ + ruler2: map[string]rulespb.RuleGroupList{ user1: {user1Group2}, user3: {user3Group1}, }, @@ -349,7 +349,7 @@ func TestSharding(t *testing.T) { expectedRules: expectedRulesMap{ // This ruler doesn't get rules from unhealthy ruler (RF=1). - ruler1: map[string]rulestore.RuleGroupList{ + ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1}, user2: {user2Group1}, }, @@ -447,10 +447,10 @@ func TestSharding(t *testing.T) { }, expectedRules: expectedRulesMap{ - ruler1: map[string]rulestore.RuleGroupList{ + ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, }, - ruler2: map[string]rulestore.RuleGroupList{ + ruler2: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, user3: {user3Group1}, }, @@ -468,13 +468,13 @@ func TestSharding(t *testing.T) { }, expectedRules: expectedRulesMap{ - ruler1: map[string]rulestore.RuleGroupList{ + ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1}, }, - ruler2: map[string]rulestore.RuleGroupList{ + ruler2: map[string]rulespb.RuleGroupList{ user1: {user1Group2}, }, - ruler3: map[string]rulestore.RuleGroupList{ + ruler3: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, user3: {user3Group1}, }, @@ -492,11 +492,11 @@ func TestSharding(t *testing.T) { }, expectedRules: expectedRulesMap{ - ruler1: map[string]rulestore.RuleGroupList{ + ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, }, ruler2: noRules, // Ruler2 owns token for user2group1, but user-2 will only be handled by ruler-1 and 3. - ruler3: map[string]rulestore.RuleGroupList{ + ruler3: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, user3: {user3Group1}, }, @@ -583,7 +583,7 @@ func TestSharding(t *testing.T) { require.NoError(t, err) // Normalize nil map to empty one. if loaded == nil { - loaded = map[string]rulestore.RuleGroupList{} + loaded = map[string]rulespb.RuleGroupList{} } expected[id] = loaded } diff --git a/pkg/ruler/rulespb/custom.go b/pkg/ruler/rulespb/custom.go new file mode 100644 index 00000000000..b0043092829 --- /dev/null +++ b/pkg/ruler/rulespb/custom.go @@ -0,0 +1,21 @@ +package rulespb + +import "github.com/prometheus/prometheus/pkg/rulefmt" + +// RuleGroupList contains a set of rule groups +type RuleGroupList []*RuleGroupDesc + +// Formatted returns the rule group list as a set of formatted rule groups mapped +// by namespace +func (l RuleGroupList) Formatted() map[string][]rulefmt.RuleGroup { + ruleMap := map[string][]rulefmt.RuleGroup{} + for _, g := range l { + if _, exists := ruleMap[g.Namespace]; !exists { + ruleMap[g.Namespace] = []rulefmt.RuleGroup{FromProto(g)} + continue + } + ruleMap[g.Namespace] = append(ruleMap[g.Namespace], FromProto(g)) + + } + return ruleMap +} diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index a655302c3a8..9f996307a13 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -97,8 +97,8 @@ func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { } // ListAllRuleGroups implements rules.RuleStore. -func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { - out := map[string]rulestore.RuleGroupList{} +func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { + out := map[string]rulespb.RuleGroupList{} // List rule groups for all tenants. err := b.bucket.Iter(ctx, "", func(key string) error { @@ -126,10 +126,10 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. -func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) { +func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) - groupList := rulestore.RuleGroupList{} + groupList := rulespb.RuleGroupList{} // The prefix to list objects depends on whether the namespace has been // specified in the request. @@ -162,7 +162,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, } // LoadRuleGroups implements rules.RuleStore. -func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error { +func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { ch := make(chan *rulespb.RuleGroupDesc) // Given we store one file per rule group. With this, we create a pool of workers that will diff --git a/pkg/ruler/rulestore/config.go b/pkg/ruler/rulestore/config.go index 2e28b678485..d44843bab8e 100644 --- a/pkg/ruler/rulestore/config.go +++ b/pkg/ruler/rulestore/config.go @@ -4,25 +4,24 @@ import ( "flag" "github.com/cortexproject/cortex/pkg/configs/client" + "github.com/cortexproject/cortex/pkg/ruler/rulestore/configdb" + "github.com/cortexproject/cortex/pkg/ruler/rulestore/local" "github.com/cortexproject/cortex/pkg/storage/bucket" ) -const ( - ConfigDB = "configdb" - - Name = "ruler-storage" - prefix = "ruler-storage." -) - // Config configures a rule store. type Config struct { bucket.Config `yaml:",inline"` ConfigDB client.Config `yaml:"configdb"` + Local local.Config `yaml:"local"` } // RegisterFlags registers the backend storage config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.ExtraBackends = []string{ConfigDB} + prefix := "ruler-storage." + + cfg.ExtraBackends = []string{configdb.Name, local.Name} cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f) + cfg.Local.RegisterFlagsWithPrefix(prefix, f) cfg.RegisterFlagsWithPrefix(prefix, f) } diff --git a/pkg/ruler/rulestore/configdb/store.go b/pkg/ruler/rulestore/configdb/store.go index 6f1a8e7dcee..5d125a920d9 100644 --- a/pkg/ruler/rulestore/configdb/store.go +++ b/pkg/ruler/rulestore/configdb/store.go @@ -7,14 +7,17 @@ import ( "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/userconfig" "github.com/cortexproject/cortex/pkg/ruler/rulespb" - "github.com/cortexproject/cortex/pkg/ruler/rulestore" +) + +const ( + Name = "configdb" ) // ConfigRuleStore is a concrete implementation of RuleStore that sources rules from the config service type ConfigRuleStore struct { configClient client.Client since userconfig.ID - ruleGroupList map[string]rulestore.RuleGroupList + ruleGroupList map[string]rulespb.RuleGroupList } func (c *ConfigRuleStore) SupportsModifications() bool { @@ -26,7 +29,7 @@ func NewConfigRuleStore(c client.Client) *ConfigRuleStore { return &ConfigRuleStore{ configClient: c, since: 0, - ruleGroupList: make(map[string]rulestore.RuleGroupList), + ruleGroupList: make(map[string]rulespb.RuleGroupList), } } @@ -43,7 +46,7 @@ func (c *ConfigRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { } // ListAllRuleGroups implements RuleStore -func (c *ConfigRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (c *ConfigRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { configs, err := c.configClient.GetRules(ctx, c.since) if err != nil { @@ -51,7 +54,7 @@ func (c *ConfigRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul } for user, cfg := range configs { - userRules := rulestore.RuleGroupList{} + userRules := rulespb.RuleGroupList{} if cfg.IsDeleted() { delete(c.ruleGroupList, user) continue @@ -85,7 +88,7 @@ func getLatestConfigID(cfgs map[string]userconfig.VersionedRulesConfig, latest u return ret } -func (c *ConfigRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) { +func (c *ConfigRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { r, err := c.ListAllRuleGroups(ctx) if err != nil { return nil, err @@ -107,7 +110,7 @@ func (c *ConfigRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, return list, nil } -func (c *ConfigRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error { +func (c *ConfigRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { // Since ConfigRuleStore already Loads the rules in the List methods, there is nothing left to do here. return nil } diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index 2daf79e6a06..f3061e1f280 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -11,7 +11,10 @@ import ( promRules "github.com/prometheus/prometheus/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" - "github.com/cortexproject/cortex/pkg/ruler/rulestore" +) + +const ( + Name = "local" ) type Config struct { @@ -70,13 +73,13 @@ func (l *Client) ListAllUsers(ctx context.Context) ([]string, error) { } // ListAllRuleGroups implements rules.RuleStore. This method also loads the rules. -func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { users, err := l.ListAllUsers(ctx) if err != nil { return nil, err } - lists := make(map[string]rulestore.RuleGroupList) + lists := make(map[string]rulespb.RuleGroupList) for _, user := range users { list, err := l.loadAllRulesGroupsForUser(ctx, user) if err != nil { @@ -90,7 +93,7 @@ func (l *Client) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.Ru } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. This method also loads the rules. -func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) { +func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { if namespace != "" { return l.loadAllRulesGroupsForUserAndNamespace(ctx, userID, namespace) } @@ -98,7 +101,7 @@ func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID s return l.loadAllRulesGroupsForUser(ctx, userID) } -func (l *Client) LoadRuleGroups(_ context.Context, _ map[string]rulestore.RuleGroupList) error { +func (l *Client) LoadRuleGroups(_ context.Context, _ map[string]rulespb.RuleGroupList) error { // This Client already loads the rules in its List methods, there is nothing left to do here. return nil } @@ -123,8 +126,8 @@ func (l *Client) DeleteNamespace(ctx context.Context, userID, namespace string) return errors.New("DeleteNamespace unsupported in rule local store") } -func (l *Client) loadAllRulesGroupsForUser(ctx context.Context, userID string) (rulestore.RuleGroupList, error) { - var allLists rulestore.RuleGroupList +func (l *Client) loadAllRulesGroupsForUser(ctx context.Context, userID string) (rulespb.RuleGroupList, error) { + var allLists rulespb.RuleGroupList root := filepath.Join(l.cfg.Directory, userID) infos, err := ioutil.ReadDir(root) @@ -159,7 +162,7 @@ func (l *Client) loadAllRulesGroupsForUser(ctx context.Context, userID string) ( return allLists, nil } -func (l *Client) loadAllRulesGroupsForUserAndNamespace(_ context.Context, userID string, namespace string) (rulestore.RuleGroupList, error) { +func (l *Client) loadAllRulesGroupsForUserAndNamespace(_ context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { filename := filepath.Join(l.cfg.Directory, userID, namespace) rulegroups, allErrors := l.loader.Load(filename) @@ -167,7 +170,7 @@ func (l *Client) loadAllRulesGroupsForUserAndNamespace(_ context.Context, userID return nil, errors.Wrapf(allErrors[0], "error parsing %s", filename) } - var list rulestore.RuleGroupList + var list rulespb.RuleGroupList for _, group := range rulegroups.Groups { desc := rulespb.ToProto(userID, namespace, group) diff --git a/pkg/ruler/rulestore/objectclient/rule_store.go b/pkg/ruler/rulestore/objectclient/rule_store.go index d68485c9836..a61ebba411b 100644 --- a/pkg/ruler/rulestore/objectclient/rule_store.go +++ b/pkg/ruler/rulestore/objectclient/rule_store.go @@ -104,7 +104,7 @@ func (o *RuleStore) ListAllUsers(ctx context.Context) ([]string, error) { } // ListAllRuleGroups implements rules.RuleStore. -func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore.RuleGroupList, error) { +func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { // No delimiter to get *all* rule groups for all users and namespaces. ruleGroupObjects, _, err := o.client.List(ctx, rulePrefix, "") if err != nil { @@ -114,7 +114,7 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rulestore return convertRuleGroupObjectsToMap(ruleGroupObjects), nil } -func (o *RuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID, namespace string) (rulestore.RuleGroupList, error) { +func (o *RuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID, namespace string) (rulespb.RuleGroupList, error) { ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, ""), "") if err != nil { return nil, err @@ -123,7 +123,7 @@ func (o *RuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userI return convertRuleGroupObjectsToMap(ruleGroupObjects)[userID], nil } -func (o *RuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error { +func (o *RuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { ch := make(chan *rulespb.RuleGroupDesc) // Given we store one file per rule group. With this, we create a pool of workers that will @@ -176,8 +176,8 @@ outer: return g.Wait() } -func convertRuleGroupObjectsToMap(ruleGroupObjects []chunk.StorageObject) map[string]rulestore.RuleGroupList { - result := map[string]rulestore.RuleGroupList{} +func convertRuleGroupObjectsToMap(ruleGroupObjects []chunk.StorageObject) map[string]rulespb.RuleGroupList { + result := map[string]rulespb.RuleGroupList{} for _, rg := range ruleGroupObjects { user, namespace, group := decomposeRuleObjectKey(rg.Key) if user == "" || namespace == "" || group == "" { diff --git a/pkg/ruler/rulestore/store.go b/pkg/ruler/rulestore/store.go index 4bc443659d7..d8b97ed05a7 100644 --- a/pkg/ruler/rulestore/store.go +++ b/pkg/ruler/rulestore/store.go @@ -4,8 +4,6 @@ import ( "context" "errors" - "github.com/prometheus/prometheus/pkg/rulefmt" - "github.com/cortexproject/cortex/pkg/ruler/rulespb" ) @@ -26,16 +24,16 @@ type RuleStore interface { ListAllUsers(ctx context.Context) ([]string, error) // ListAllRuleGroups returns all rule groups for all users. - ListAllRuleGroups(ctx context.Context) (map[string]RuleGroupList, error) + ListAllRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) // ListRuleGroupsForUserAndNamespace returns all the active rule groups for a user from given namespace. // If namespace is empty, groups from all namespaces are returned. - ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (RuleGroupList, error) + ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) // LoadRuleGroups loads rules for each rule group in the map. // Parameter with groups to load *MUST* be coming from one of the List methods. // Reason is that some implementations don't do anything, since their List method already loads the rules. - LoadRuleGroups(ctx context.Context, groupsToLoad map[string]RuleGroupList) error + LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error GetRuleGroup(ctx context.Context, userID, namespace, group string) (*rulespb.RuleGroupDesc, error) SetRuleGroup(ctx context.Context, userID, namespace string, group *rulespb.RuleGroupDesc) error @@ -47,21 +45,3 @@ type RuleStore interface { // If namespace is empty, deletes all rule groups for user. DeleteNamespace(ctx context.Context, userID, namespace string) error } - -// RuleGroupList contains a set of rule groups -type RuleGroupList []*rulespb.RuleGroupDesc - -// Formatted returns the rule group list as a set of formatted rule groups mapped -// by namespace -func (l RuleGroupList) Formatted() map[string][]rulefmt.RuleGroup { - ruleMap := map[string][]rulefmt.RuleGroup{} - for _, g := range l { - if _, exists := ruleMap[g.Namespace]; !exists { - ruleMap[g.Namespace] = []rulefmt.RuleGroup{rulespb.FromProto(g)} - continue - } - ruleMap[g.Namespace] = append(ruleMap[g.Namespace], rulespb.FromProto(g)) - - } - return ruleMap -} diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 642871192e3..6193fbdc369 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -115,8 +115,8 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, loader promRules.GroupLoader, logge } // NewRuleStore returns a rule store backend client based on the provided cfg. -func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (rulestore.RuleStore, error) { - if cfg.Backend == rulestore.ConfigDB { +func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, logger log.Logger, reg prometheus.Registerer) (rulestore.RuleStore, error) { + if cfg.Backend == configdb.Name { c, err := client.New(cfg.ConfigDB) if err != nil { @@ -126,7 +126,11 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. return configdb.NewConfigRuleStore(c), nil } - bucketClient, err := bucket.NewClient(ctx, cfg.Config, rulestore.Name, logger, reg) + if cfg.Backend == local.Name { + return local.NewLocalRulesClient(cfg.Local, loader) + } + + bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg) if err != nil { return nil, err } diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 9096604a78d..0da5a992e22 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -11,13 +11,13 @@ import ( ) type mockRuleStore struct { - rules map[string]rulestore.RuleGroupList + rules map[string]rulespb.RuleGroupList mtx sync.Mutex } var ( interval, _ = time.ParseDuration("1m") - mockRulesNamespaces = map[string]rulestore.RuleGroupList{ + mockRulesNamespaces = map[string]rulespb.RuleGroupList{ "user1": { &rulespb.RuleGroupDesc{ Name: "group1", @@ -53,7 +53,7 @@ var ( }, }, } - mockRules = map[string]rulestore.RuleGroupList{ + mockRules = map[string]rulespb.RuleGroupList{ "user1": { &rulespb.RuleGroupDesc{ Name: "group1", @@ -88,7 +88,7 @@ var ( }, } - mockSpecialCharRules = map[string]rulestore.RuleGroupList{ + mockSpecialCharRules = map[string]rulespb.RuleGroupList{ "user1": { &rulespb.RuleGroupDesc{ Name: ")(_+?/|group1+/?", @@ -110,7 +110,7 @@ var ( } ) -func newMockRuleStore(rules map[string]rulestore.RuleGroupList) *mockRuleStore { +func newMockRuleStore(rules map[string]rulespb.RuleGroupList) *mockRuleStore { return &mockRuleStore{ rules: rules, } @@ -127,32 +127,32 @@ func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { return result, nil } -func (m *mockRuleStore) ListAllRuleGroups(_ context.Context) (map[string]rulestore.RuleGroupList, error) { +func (m *mockRuleStore) ListAllRuleGroups(_ context.Context) (map[string]rulespb.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock() - result := make(map[string]rulestore.RuleGroupList) + result := make(map[string]rulespb.RuleGroupList) for k, v := range m.rules { - result[k] = append(rulestore.RuleGroupList(nil), v...) + result[k] = append(rulespb.RuleGroupList(nil), v...) } return result, nil } -func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string) (rulestore.RuleGroupList, error) { +func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string) (rulespb.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock() userRules, exists := m.rules[userID] if !exists { - return rulestore.RuleGroupList{}, nil + return rulespb.RuleGroupList{}, nil } if namespace == "" { return userRules, nil } - namespaceRules := rulestore.RuleGroupList{} + namespaceRules := rulespb.RuleGroupList{} for _, rg := range userRules { if rg.Namespace == namespace { @@ -161,13 +161,13 @@ func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, use } if len(namespaceRules) == 0 { - return rulestore.RuleGroupList{}, nil + return rulespb.RuleGroupList{}, nil } return namespaceRules, nil } -func (m *mockRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulestore.RuleGroupList) error { +func (m *mockRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) error { // Nothing to do, as mockRuleStore already returns groups with loaded rules. return nil } @@ -200,7 +200,7 @@ func (m *mockRuleStore) SetRuleGroup(ctx context.Context, userID string, namespa userRules, exists := m.rules[userID] if !exists { - userRules = rulestore.RuleGroupList{} + userRules = rulespb.RuleGroupList{} m.rules[userID] = userRules } @@ -225,7 +225,7 @@ func (m *mockRuleStore) DeleteRuleGroup(ctx context.Context, userID string, name userRules, exists := m.rules[userID] if !exists { - userRules = rulestore.RuleGroupList{} + userRules = rulespb.RuleGroupList{} m.rules[userID] = userRules } @@ -249,7 +249,7 @@ func (m *mockRuleStore) DeleteNamespace(ctx context.Context, userID, namespace s userRules, exists := m.rules[userID] if !exists { - userRules = rulestore.RuleGroupList{} + userRules = rulespb.RuleGroupList{} m.rules[userID] = userRules }