Skip to content

Commit 4db8906

Browse files
committed
Split compactor cleaner metrics
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 826e32a commit 4db8906

File tree

3 files changed

+258
-60
lines changed

3 files changed

+258
-60
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 104 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
236236
go func() {
237237
c.runDeleteUserCleanup(ctx, deleteChan)
238238
}()
239+
var metricsChan chan *cleanerJob
240+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle &&
241+
c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
242+
metricsChan = make(chan *cleanerJob)
243+
defer close(metricsChan)
244+
go func() {
245+
c.runEmitMetricsWorker(ctx, metricsChan)
246+
}()
247+
}
239248

240249
for {
241250
select {
@@ -269,6 +278,17 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
269278
c.enqueueJobFailed.WithLabelValues(deletedStatus).Inc()
270279
}
271280

281+
if metricsChan != nil {
282+
select {
283+
case metricsChan <- &cleanerJob{
284+
users: activeUsers,
285+
timestamp: cleanJobTimestamp,
286+
}:
287+
default:
288+
level.Warn(c.logger).Log("msg", "unable to push metrics job to metricsChan")
289+
}
290+
}
291+
272292
case <-ctx.Done():
273293
return nil
274294
}
@@ -288,10 +308,25 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
288308
}
289309
}
290310

291-
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan chan *cleanerJob) {
311+
func (c *BlocksCleaner) runEmitMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
312+
for job := range jobChan {
313+
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
314+
userLogger := util_log.WithUserID(userID, c.logger)
315+
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
316+
c.emitUserMetrics(ctx, userLogger, userBucket, userID)
317+
return nil
318+
})
319+
320+
if err != nil {
321+
level.Error(c.logger).Log("msg", "emit metrics failed", "err", err.Error())
322+
}
323+
}
324+
}
325+
326+
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan <-chan *cleanerJob) {
292327
for job := range jobChan {
293328
if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() {
294-
level.Warn(c.logger).Log("Active user cleaner job too old. Ignoring to get recent data")
329+
level.Warn(c.logger).Log("msg", "Active user cleaner job too old. Ignoring to get recent data")
295330
continue
296331
}
297332
err := c.cleanUpActiveUsers(ctx, job.users, false)
@@ -726,59 +761,14 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
726761
}
727762

728763
func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
729-
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
730-
path string
731-
status PartitionedGroupStatus
732-
})
733-
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
734-
if strings.Contains(file, PartitionVisitMarkerDirectory) {
735-
return nil
736-
}
737-
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
738-
if err != nil {
739-
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
740-
return nil
741-
}
742-
743-
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
744-
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
745-
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
746-
path string
747-
status PartitionedGroupStatus
748-
}{
749-
path: file,
750-
status: status,
751-
}
752-
return nil
753-
})
754-
764+
err, existentPartitionedGroupInfo := c.iterPartitionGroups(ctx, userBucket, userLogger)
755765
if err != nil {
756766
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
757767
}
758768

759-
remainingCompactions := 0
760-
inProgressCompactions := 0
761-
var oldestPartitionGroup *PartitionedGroupInfo
762-
defer func() {
763-
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
764-
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
765-
if c.oldestPartitionGroupOffset != nil {
766-
if oldestPartitionGroup != nil {
767-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
768-
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
769-
} else {
770-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
771-
}
772-
}
773-
}()
774769
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
775770
partitionedGroupInfoFile := extraInfo.path
776771

777-
remainingCompactions += extraInfo.status.PendingPartitions
778-
inProgressCompactions += extraInfo.status.InProgressPartitions
779-
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
780-
oldestPartitionGroup = partitionedGroupInfo
781-
}
782772
if extraInfo.status.CanDelete {
783773
if extraInfo.status.IsCompleted {
784774
// Try to remove all blocks included in partitioned group info
@@ -809,6 +799,72 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
809799
}
810800
}
811801

802+
func (c *BlocksCleaner) emitUserMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
803+
err, existentPartitionedGroupInfo := c.iterPartitionGroups(ctx, userBucket, userLogger)
804+
if err != nil {
805+
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
806+
}
807+
808+
remainingCompactions := 0
809+
inProgressCompactions := 0
810+
completedCompaction := 0
811+
var oldestPartitionGroup *PartitionedGroupInfo
812+
defer func() {
813+
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
814+
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
815+
if c.oldestPartitionGroupOffset != nil {
816+
if oldestPartitionGroup != nil {
817+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
818+
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
819+
} else {
820+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
821+
}
822+
}
823+
}()
824+
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
825+
remainingCompactions += extraInfo.status.PendingPartitions
826+
inProgressCompactions += extraInfo.status.InProgressPartitions
827+
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
828+
oldestPartitionGroup = partitionedGroupInfo
829+
}
830+
if extraInfo.status.IsCompleted {
831+
completedCompaction += len(partitionedGroupInfo.Partitions)
832+
}
833+
}
834+
}
835+
836+
func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger) (error, map[*PartitionedGroupInfo]struct {
837+
path string
838+
status PartitionedGroupStatus
839+
}) {
840+
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
841+
path string
842+
status PartitionedGroupStatus
843+
})
844+
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
845+
if strings.Contains(file, PartitionVisitMarkerDirectory) {
846+
return nil
847+
}
848+
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
849+
if err != nil {
850+
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
851+
return nil
852+
}
853+
854+
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
855+
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
856+
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
857+
path string
858+
status PartitionedGroupStatus
859+
}{
860+
path: file,
861+
status: status,
862+
}
863+
return nil
864+
})
865+
return err, existentPartitionedGroupInfo
866+
}
867+
812868
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
813869
// and index are updated accordingly.
814870
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cortexproject/cortex/pkg/util"
2929
util_log "github.com/cortexproject/cortex/pkg/util/log"
3030
"github.com/cortexproject/cortex/pkg/util/services"
31+
"github.com/prometheus/client_golang/prometheus/testutil"
3132
)
3233

3334
type testBlocksCleanerOptions struct {
@@ -949,7 +950,6 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
949950
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
950951
require.NoError(t, err)
951952
require.False(t, block2DeletionMarkerExists)
952-
953953
}
954954

955955
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
@@ -1021,6 +1021,130 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
10211021
require.True(t, userBucket.IsObjNotFoundErr(err))
10221022
}
10231023

1024+
func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
1025+
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
1026+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
1027+
1028+
cfg := BlocksCleanerConfig{
1029+
DeletionDelay: time.Hour,
1030+
CleanupInterval: time.Minute,
1031+
CleanupConcurrency: 1,
1032+
ShardingStrategy: util.ShardingStrategyShuffle,
1033+
CompactionStrategy: util.CompactionStrategyPartitioning,
1034+
}
1035+
1036+
ctx := context.Background()
1037+
logger := log.NewNopLogger()
1038+
registry := prometheus.NewPedanticRegistry()
1039+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
1040+
Strategy: tsdb.UserScanStrategyList,
1041+
}, bucketClient, logger, registry)
1042+
require.NoError(t, err)
1043+
cfgProvider := newMockConfigProvider()
1044+
dummyCounterVec := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"test"})
1045+
remainingPlannedCompactions := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
1046+
Name: "cortex_compactor_remaining_planned_compactions",
1047+
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
1048+
}, commonLabels)
1049+
1050+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 15*time.Minute, cfgProvider, logger, "test-cleaner", registry, time.Minute, 30*time.Second, dummyCounterVec, remainingPlannedCompactions)
1051+
1052+
ts := func(hours int) int64 {
1053+
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
1054+
}
1055+
1056+
userID := "user-1"
1057+
partitionedGroupID := uint32(123)
1058+
partitionCount := 5
1059+
startTime := ts(-10)
1060+
endTime := ts(-8)
1061+
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
1062+
partitionedGroupInfo := PartitionedGroupInfo{
1063+
PartitionedGroupID: partitionedGroupID,
1064+
PartitionCount: partitionCount,
1065+
Partitions: []Partition{
1066+
{
1067+
PartitionID: 0,
1068+
},
1069+
{
1070+
PartitionID: 1,
1071+
},
1072+
{
1073+
PartitionID: 2,
1074+
},
1075+
{
1076+
PartitionID: 3,
1077+
},
1078+
{
1079+
PartitionID: 4,
1080+
},
1081+
},
1082+
RangeStart: startTime,
1083+
RangeEnd: endTime,
1084+
CreationTime: time.Now().Add(-1 * time.Hour).Unix(),
1085+
Version: PartitionedGroupInfoVersion1,
1086+
}
1087+
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
1088+
require.NoError(t, err)
1089+
1090+
//InProgress with valid VisitTime
1091+
v0 := &partitionVisitMarker{
1092+
PartitionedGroupID: partitionedGroupID,
1093+
PartitionID: 0,
1094+
Status: InProgress,
1095+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
1096+
}
1097+
v0Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v0)
1098+
err = v0Manager.updateVisitMarker(ctx)
1099+
require.NoError(t, err)
1100+
1101+
//InProgress with expired VisitTime
1102+
v1 := &partitionVisitMarker{
1103+
PartitionedGroupID: partitionedGroupID,
1104+
PartitionID: 1,
1105+
Status: InProgress,
1106+
VisitTime: time.Now().Add(-30 * time.Minute).Unix(),
1107+
}
1108+
v1Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v1)
1109+
err = v1Manager.updateVisitMarker(ctx)
1110+
require.NoError(t, err)
1111+
1112+
//V2 and V3 are pending
1113+
//V4 is completed
1114+
v4 := &partitionVisitMarker{
1115+
PartitionedGroupID: partitionedGroupID,
1116+
PartitionID: 4,
1117+
Status: Completed,
1118+
VisitTime: time.Now().Add(-20 * time.Minute).Unix(),
1119+
}
1120+
v4Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v4)
1121+
err = v4Manager.updateVisitMarker(ctx)
1122+
require.NoError(t, err)
1123+
1124+
cleaner.emitUserMetrics(ctx, logger, userBucket, userID)
1125+
1126+
metricNames := []string{
1127+
"cortex_compactor_remaining_planned_compactions",
1128+
"cortex_compactor_in_progress_compactions",
1129+
"cortex_compactor_oldest_partition_offset",
1130+
}
1131+
1132+
// Check tracked Prometheus metrics
1133+
expectedMetrics := `
1134+
# HELP cortex_compactor_in_progress_compactions Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy
1135+
# TYPE cortex_compactor_in_progress_compactions gauge
1136+
cortex_compactor_in_progress_compactions{user="user-1"} 1
1137+
# HELP cortex_compactor_oldest_partition_offset Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy
1138+
# TYPE cortex_compactor_oldest_partition_offset gauge
1139+
cortex_compactor_oldest_partition_offset{user="user-1"} 3600
1140+
# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy
1141+
# TYPE cortex_compactor_remaining_planned_compactions gauge
1142+
cortex_compactor_remaining_planned_compactions{user="user-1"} 3
1143+
`
1144+
1145+
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
1146+
}
1147+
10241148
type mockConfigProvider struct {
10251149
userRetentionPeriods map[string]time.Duration
10261150
parquetConverterEnabled map[string]bool

0 commit comments

Comments
 (0)