From 03c2aff0aa4f100820a91c07d9fc83e7cc4db0d4 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 14 Jan 2025 13:26:58 -0800 Subject: [PATCH 1/4] Add cleaner logic to clean partition compaction blocks and related files Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 131 ++++++++++++++++++++++++ pkg/compactor/blocks_cleaner_test.go | 95 +++++++++++++++-- pkg/compactor/compactor.go | 6 +- pkg/compactor/partitioned_group_info.go | 16 +++ 4 files changed, 240 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 3ea46a5f38a..2ea7b307f89 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -3,6 +3,7 @@ package compactor import ( "context" "fmt" + "strings" "sync" "time" @@ -39,6 +40,8 @@ type BlocksCleanerConfig struct { CleanupConcurrency int BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required. TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". + ShardingStrategy string + CompactionStrategy string } type BlocksCleaner struct { @@ -57,6 +60,7 @@ type BlocksCleaner struct { cleanerVisitMarkerTimeout time.Duration cleanerVisitMarkerFileUpdateInterval time.Duration + compactionVisitMarkerTimeout time.Duration // Metrics. runsStarted *prometheus.CounterVec @@ -73,12 +77,16 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec tenantBlocksCleanedTotal *prometheus.CounterVec tenantCleanDuration *prometheus.GaugeVec + remainingPlannedCompactions *prometheus.GaugeVec + inProgressCompactions *prometheus.GaugeVec + oldestPartitionGroupOffset *prometheus.GaugeVec } func NewBlocksCleaner( cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, + compactionVisitMarkerTimeout time.Duration, cfgProvider ConfigProvider, logger log.Logger, ringLifecyclerID string, @@ -86,11 +94,27 @@ func NewBlocksCleaner( cleanerVisitMarkerTimeout time.Duration, cleanerVisitMarkerFileUpdateInterval time.Duration, blocksMarkedForDeletion *prometheus.CounterVec, + remainingPlannedCompactions *prometheus.GaugeVec, ) *BlocksCleaner { + + var inProgressCompactions *prometheus.GaugeVec + var oldestPartitionGroupOffset *prometheus.GaugeVec + if cfg.ShardingStrategy == util.ShardingStrategyShuffle { + inProgressCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_compactor_in_progress_compactions", + Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy", + }, commonLabels) + oldestPartitionGroupOffset = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_compactor_oldest_partition_offset", + Help: "Time in seconds between now and the oldest created partition group not completed.", + }, commonLabels) + } + c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, usersScanner: usersScanner, + compactionVisitMarkerTimeout: compactionVisitMarkerTimeout, cfgProvider: cfgProvider, logger: log.With(logger, "component", "cleaner"), ringLifecyclerID: ringLifecyclerID, @@ -153,6 +177,9 @@ func NewBlocksCleaner( Name: "cortex_bucket_clean_duration_seconds", Help: "Duration of cleaner runtime for a tenant in seconds", }, commonLabels), + remainingPlannedCompactions: remainingPlannedCompactions, + inProgressCompactions: inProgressCompactions, + oldestPartitionGroupOffset: oldestPartitionGroupOffset, } c.Service = services.NewBasicService(c.starting, c.loop, nil) @@ -327,6 +354,11 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) + if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle { + c.inProgressCompactions.DeleteLabelValues(userID) + c.remainingPlannedCompactions.DeleteLabelValues(userID) + c.oldestPartitionGroupOffset.DeleteLabelValues(userID) + } } } c.lastOwnedUsers = allUsers @@ -447,6 +479,15 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted) } + if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + // Clean up partitioned group info files + if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil { + return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory) + } else if deleted > 0 { + level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted) + } + } + if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil { return errors.Wrap(err, "failed to delete marker files") } else if deleted > 0 { @@ -592,6 +633,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us } level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + begin = time.Now() + c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID) + level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + } + c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) @@ -600,6 +647,90 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us return nil } +func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) { + deletePartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct { + path string + status PartitionedGroupStatus + }) + err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error { + if strings.Contains(file, PartitionVisitMarkerDirectory) { + return nil + } + partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file) + if err != nil { + level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file) + return nil + } + + status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) + level.Info(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) + deletePartitionedGroupInfo[partitionedGroupInfo] = struct { + path string + status PartitionedGroupStatus + }{ + path: file, + status: status, + } + return nil + }) + + if err != nil { + level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err) + } + + remainingCompactions := 0 + inProgressCompactions := 0 + var oldestPartitionGroup *PartitionedGroupInfo + defer func() { + c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions)) + c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions)) + if c.oldestPartitionGroupOffset != nil { + if oldestPartitionGroup != nil { + c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) + level.Info(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) + } else { + c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) + } + } + }() + for partitionedGroupInfo, extraInfo := range deletePartitionedGroupInfo { + partitionedGroupInfoFile := extraInfo.path + + remainingCompactions += extraInfo.status.PendingPartitions + inProgressCompactions += extraInfo.status.InProgressPartitions + if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime { + oldestPartitionGroup = partitionedGroupInfo + } + if extraInfo.status.CanDelete { + if extraInfo.status.IsCompleted { + // Try to remove all blocks included in partitioned group info + if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil { + level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + // if one block can not be marked for deletion, we should + // skip delete this partitioned group. next iteration + // would try it again. + continue + } + } + + if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { + level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + } else { + level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + } + } + + if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker { + // Remove partition visit markers + if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger); err != nil { + level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + } else { + level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile) + } + } + } +} + // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map // and index are updated accordingly. func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index d3c7aa6da9e..9c12dd4f14b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -24,6 +24,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -86,8 +87,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) // Clean User with no error cleaner.bucketClient = bkt @@ -193,8 +195,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -354,8 +357,9 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -418,8 +422,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -476,8 +481,9 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) require.NoError(t, err) require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) @@ -617,8 +623,9 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) @@ -811,6 +818,82 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { } } +func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { + bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + ts := func(hours int) int64 { + return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000 + } + + userID := "user-1" + partitionedGroupID := uint32(123) + partitionCount := 1 + startTime := ts(-10) + endTime := ts(-8) + block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil) + + cfg := BlocksCleanerConfig{ + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + ShardingStrategy: util.ShardingStrategyShuffle, + } + + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, reasonLabelName)) + dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) + + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) + + userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider) + + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: partitionCount, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{block1}, + }, + }, + RangeStart: startTime, + RangeEnd: endTime, + CreationTime: time.Now().Add(-5 * time.Minute).Unix(), + Version: PartitionedGroupInfoVersion1, + } + _, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo) + require.NoError(t, err) + + visitMarker := &partitionVisitMarker{ + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + Status: Completed, + VisitTime: time.Now().Add(-2 * time.Minute).Unix(), + } + visitMarkerManager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", visitMarker) + err = visitMarkerManager.updateVisitMarker(ctx) + require.NoError(t, err) + + cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID) + + partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID)) + require.NoError(t, err) + require.False(t, partitionedGroupFileExists) + + block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename)) + require.NoError(t, err) + require.True(t, block1DeletionMarkerExists) + +} + type mockConfigProvider struct { userRetentionPeriods map[string]time.Duration } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a1a9a8f8a2b..01f534f6296 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -657,8 +657,10 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, - c.compactorMetrics.syncerBlocksMarkedForDeletion) + ShardingStrategy: c.compactorCfg.ShardingStrategy, + CompactionStrategy: c.compactorCfg.CompactionStrategy, + }, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions) // Ensure an initial cleanup occurred before starting the compactor. if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil { diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index 71d4c61639c..7ac2f88c0dd 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -14,7 +14,9 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/util/runutil" @@ -232,6 +234,20 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket return noCompactMarkerExists } +func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) error { + blocks := p.getAllBlocks() + for _, blockID := range blocks { + if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) { + if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { + level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + return err + } + level.Info(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + } + } + return nil +} + func (p *PartitionedGroupInfo) String() string { var partitions []string for _, partition := range p.Partitions { From 6732e59db5a80c388c8627e7d52bec5ec50c0cfa Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 14 Jan 2025 13:37:16 -0800 Subject: [PATCH 2/4] refactored metrics Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 12 +++++++----- pkg/compactor/blocks_cleaner_test.go | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 2ea7b307f89..238e28d2092 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -99,14 +99,14 @@ func NewBlocksCleaner( var inProgressCompactions *prometheus.GaugeVec var oldestPartitionGroupOffset *prometheus.GaugeVec - if cfg.ShardingStrategy == util.ShardingStrategyShuffle { + if cfg.ShardingStrategy == util.ShardingStrategyShuffle && cfg.CompactionStrategy == util.CompactionStrategyPartitioning { inProgressCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_compactor_in_progress_compactions", - Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy", + Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy", }, commonLabels) oldestPartitionGroupOffset = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_compactor_oldest_partition_offset", - Help: "Time in seconds between now and the oldest created partition group not completed.", + Help: "Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy", }, commonLabels) } @@ -355,9 +355,11 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - c.inProgressCompactions.DeleteLabelValues(userID) c.remainingPlannedCompactions.DeleteLabelValues(userID) - c.oldestPartitionGroupOffset.DeleteLabelValues(userID) + if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + c.inProgressCompactions.DeleteLabelValues(userID) + c.oldestPartitionGroupOffset.DeleteLabelValues(userID) + } } } } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9c12dd4f14b..f7ac1a998b0 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -838,6 +838,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { CleanupInterval: time.Minute, CleanupConcurrency: 1, ShardingStrategy: util.ShardingStrategyShuffle, + CompactionStrategy: util.CompactionStrategyPartitioning, } ctx := context.Background() From 29b18c05b00bce374d0836e13445e4bcf2f96a67 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 14 Jan 2025 16:09:29 -0800 Subject: [PATCH 3/4] refactor Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 238e28d2092..96efab9fe6c 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -650,7 +650,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us } func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) { - deletePartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct { + existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct { path string status PartitionedGroupStatus }) @@ -666,7 +666,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) level.Info(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) - deletePartitionedGroupInfo[partitionedGroupInfo] = struct { + existentPartitionedGroupInfo[partitionedGroupInfo] = struct { path string status PartitionedGroupStatus }{ @@ -695,7 +695,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } } }() - for partitionedGroupInfo, extraInfo := range deletePartitionedGroupInfo { + for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo { partitionedGroupInfoFile := extraInfo.path remainingCompactions += extraInfo.status.PendingPartitions From 214c6595e14ffcc4b0e59fdfc1f51ba78de510b1 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 15 Jan 2025 12:19:21 -0800 Subject: [PATCH 4/4] update logs Signed-off-by: Alex Le --- pkg/compactor/blocks_cleaner.go | 4 ++-- pkg/compactor/partitioned_group_info.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 96efab9fe6c..71dfc775dfd 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -665,7 +665,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) - level.Info(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) + level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) existentPartitionedGroupInfo[partitionedGroupInfo] = struct { path string status PartitionedGroupStatus @@ -689,7 +689,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke if c.oldestPartitionGroupOffset != nil { if oldestPartitionGroup != nil { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) - level.Info(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) + level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) } else { c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) } diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index 7ac2f88c0dd..f1c429a07a6 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -236,13 +236,18 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) error { blocks := p.getAllBlocks() + deleteBlocksCount := 0 + defer func() { + level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) + }() for _, blockID := range blocks { if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) { if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) return err } - level.Info(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) + deleteBlocksCount++ + level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) } } return nil