Skip to content

Commit 8994a7c

Browse files
authored
Added option to enable/disable specific users for compaction. (#3385)
* Added option to enable/disable specific users for compaction. Signed-off-by: Peter Štibraný <[email protected]> * Log list of enabled/disabled users on startup. Signed-off-by: Peter Štibraný <[email protected]> * Always check if compactor owns the user. Signed-off-by: Peter Štibraný <[email protected]> * Make new flags visible in the documentation. Added CHANGELOG. Signed-off-by: Peter Štibraný <[email protected]>
1 parent fd0548e commit 8994a7c

File tree

5 files changed

+125
-9
lines changed

5 files changed

+125
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
* [ENHANCEMENT] Added `cortex_alertmanager_config_last_reload_successful_seconds` metric to show timestamp of last successful AM config reload. #3289
8888
* [ENHANCEMENT] Blocks storage: reduced number of bucket listing operations to list block content (applies to newly created blocks only). #3363
8989
* [ENHANCEMENT] Ruler: Include the tenant ID on the notifier logs. #3372
90+
* [ENHANCEMENT] Blocks storage Compactor: Added `-compactor.enabled-tenants` and `-compactor.disabled-tenants` to explicitly enable or disable compaction of specific tenants. #3385
9091
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
9192
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
9293
* [BUGFIX] Handle hash-collisions in the query path. #3192

docs/blocks-storage/compactor.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,18 @@ compactor:
124124
# CLI flag: -compactor.deletion-delay
125125
[deletion_delay: <duration> | default = 12h]
126126

127+
# Comma separated list of tenants that can be compacted. If specified, only
128+
# these tenants will be compacted by compactor, otherwise all tenants can be
129+
# compacted. Subject to sharding.
130+
# CLI flag: -compactor.enabled-tenants
131+
[enabled_tenants: <string> | default = ""]
132+
133+
# Comma separated list of tenants that cannot be compacted by this compactor.
134+
# If specified, and compactor would normally pick given tenant for compaction
135+
# (via -compactor.enabled-tenants or sharding), it will be ignored instead.
136+
# CLI flag: -compactor.disabled-tenants
137+
[disabled_tenants: <string> | default = ""]
138+
127139
# Shard tenants across multiple compactor instances. Sharding is required if
128140
# you run multiple compactor instances, in order to coordinate compactions and
129141
# avoid race conditions leading to the same tenant blocks simultaneously

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3681,6 +3681,18 @@ The `compactor_config` configures the compactor for the blocks storage.
36813681
# CLI flag: -compactor.deletion-delay
36823682
[deletion_delay: <duration> | default = 12h]
36833683
3684+
# Comma separated list of tenants that can be compacted. If specified, only
3685+
# these tenants will be compacted by compactor, otherwise all tenants can be
3686+
# compacted. Subject to sharding.
3687+
# CLI flag: -compactor.enabled-tenants
3688+
[enabled_tenants: <string> | default = ""]
3689+
3690+
# Comma separated list of tenants that cannot be compacted by this compactor. If
3691+
# specified, and compactor would normally pick given tenant for compaction (via
3692+
# -compactor.enabled-tenants or sharding), it will be ignored instead.
3693+
# CLI flag: -compactor.disabled-tenants
3694+
[disabled_tenants: <string> | default = ""]
3695+
36843696
# Shard tenants across multiple compactor instances. Sharding is required if you
36853697
# run multiple compactor instances, in order to coordinate compactions and avoid
36863698
# race conditions leading to the same tenant blocks simultaneously compacted by

pkg/compactor/compactor.go

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cortexproject/cortex/pkg/ring"
2525
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2626
"github.com/cortexproject/cortex/pkg/util"
27+
"github.com/cortexproject/cortex/pkg/util/flagext"
2728
"github.com/cortexproject/cortex/pkg/util/services"
2829
)
2930

@@ -39,6 +40,9 @@ type Config struct {
3940
CompactionConcurrency int `yaml:"compaction_concurrency"`
4041
DeletionDelay time.Duration `yaml:"deletion_delay"`
4142

43+
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
44+
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
45+
4246
// Compactors sharding.
4347
ShardingEnabled bool `yaml:"sharding_enabled"`
4448
ShardingRing RingConfig `yaml:"sharding_ring"`
@@ -71,6 +75,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7175
"If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
7276
"If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+
7377
"if store gateway still has the block loaded, or compactor is ignoring the deletion because it's compacting the block at the same time.")
78+
79+
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
80+
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
7481
}
7582

7683
// Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
@@ -83,6 +90,12 @@ type Compactor struct {
8390
parentLogger log.Logger
8491
registerer prometheus.Registerer
8592

93+
// If empty, all users are enabled. If not empty, only users in the map are enabled (possibly owned by compactor, also subject to sharding configuration).
94+
enabledUsers map[string]struct{}
95+
96+
// If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor).
97+
disabledUsers map[string]struct{}
98+
8699
// Function that creates bucket client and TSDB compactor using the context.
87100
// Useful for injecting mock objects from tests.
88101
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error)
@@ -179,6 +192,24 @@ func newCompactor(
179192
}),
180193
}
181194

195+
if len(compactorCfg.EnabledTenants) > 0 {
196+
c.enabledUsers = map[string]struct{}{}
197+
for _, u := range compactorCfg.EnabledTenants {
198+
c.enabledUsers[u] = struct{}{}
199+
}
200+
201+
level.Info(c.logger).Log("msg", "using enabled users", "enabled", strings.Join(compactorCfg.EnabledTenants, ", "))
202+
}
203+
204+
if len(compactorCfg.DisabledTenants) > 0 {
205+
c.disabledUsers = map[string]struct{}{}
206+
for _, u := range compactorCfg.DisabledTenants {
207+
c.disabledUsers[u] = struct{}{}
208+
}
209+
210+
level.Info(c.logger).Log("msg", "using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
211+
}
212+
182213
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
183214

184215
return c, nil
@@ -322,15 +353,13 @@ func (c *Compactor) compactUsers(ctx context.Context) error {
322353
return ctx.Err()
323354
}
324355

325-
// If sharding is enabled, ensure the user ID belongs to our shard.
326-
if c.compactorCfg.ShardingEnabled {
327-
if owned, err := c.ownUser(userID); err != nil {
328-
level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
329-
continue
330-
} else if !owned {
331-
level.Debug(c.logger).Log("msg", "skipping user because not owned by this shard", "user", userID)
332-
continue
333-
}
356+
// Ensure the user ID belongs to our shard.
357+
if owned, err := c.ownUser(userID); err != nil {
358+
level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
359+
continue
360+
} else if !owned {
361+
level.Debug(c.logger).Log("msg", "skipping user because not owned by this shard", "user", userID)
362+
continue
334363
}
335364

336365
level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)
@@ -444,6 +473,10 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
444473
}
445474

446475
func (c *Compactor) ownUser(userID string) (bool, error) {
476+
if !isAllowedUser(c.enabledUsers, c.disabledUsers, userID) {
477+
return false, nil
478+
}
479+
447480
// Always owned if sharding is disabled.
448481
if !c.compactorCfg.ShardingEnabled {
449482
return true, nil
@@ -466,3 +499,19 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
466499

467500
return rs.Ingesters[0].Addr == c.ringLifecycler.Addr, nil
468501
}
502+
503+
func isAllowedUser(enabledUsers, disabledUsers map[string]struct{}, userID string) bool {
504+
if len(enabledUsers) > 0 {
505+
if _, ok := enabledUsers[userID]; !ok {
506+
return false
507+
}
508+
}
509+
510+
if len(disabledUsers) > 0 {
511+
if _, ok := disabledUsers[userID]; ok {
512+
return false
513+
}
514+
}
515+
516+
return true
517+
}

pkg/compactor/compactor_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,3 +937,45 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string {
937937

938938
return string(content)
939939
}
940+
941+
func TestAllowedUser(t *testing.T) {
942+
testCases := map[string]struct {
943+
enabled, disabled map[string]struct{}
944+
user string
945+
expected bool
946+
}{
947+
"no enabled or disabled": {
948+
user: "test",
949+
expected: true,
950+
},
951+
952+
"only enabled, enabled": {
953+
enabled: map[string]struct{}{"user": {}},
954+
user: "user",
955+
expected: true,
956+
},
957+
958+
"only enabled, disabled": {
959+
enabled: map[string]struct{}{"user": {}},
960+
user: "not user",
961+
expected: false,
962+
},
963+
964+
"only disabled, disabled": {
965+
disabled: map[string]struct{}{"user": {}},
966+
user: "user",
967+
expected: false,
968+
},
969+
970+
"only disabled, enabled": {
971+
disabled: map[string]struct{}{"user": {}},
972+
user: "not user",
973+
expected: true,
974+
},
975+
}
976+
for name, tc := range testCases {
977+
t.Run(name, func(t *testing.T) {
978+
require.Equal(t, tc.expected, isAllowedUser(tc.enabled, tc.disabled, tc.user))
979+
})
980+
}
981+
}

0 commit comments

Comments
 (0)