-
Notifications
You must be signed in to change notification settings - Fork 840
Add cleaner logic to clean partition compaction blocks and related files #6507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ package compactor | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -39,6 +40,8 @@ type BlocksCleanerConfig struct { | |
| CleanupConcurrency int | ||
| BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required. | ||
| TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". | ||
| ShardingStrategy string | ||
| CompactionStrategy string | ||
| } | ||
|
|
||
| type BlocksCleaner struct { | ||
|
|
@@ -57,6 +60,7 @@ type BlocksCleaner struct { | |
|
|
||
| cleanerVisitMarkerTimeout time.Duration | ||
| cleanerVisitMarkerFileUpdateInterval time.Duration | ||
| compactionVisitMarkerTimeout time.Duration | ||
|
|
||
| // Metrics. | ||
| runsStarted *prometheus.CounterVec | ||
|
|
@@ -73,24 +77,44 @@ type BlocksCleaner struct { | |
| tenantBucketIndexLastUpdate *prometheus.GaugeVec | ||
| tenantBlocksCleanedTotal *prometheus.CounterVec | ||
| tenantCleanDuration *prometheus.GaugeVec | ||
| remainingPlannedCompactions *prometheus.GaugeVec | ||
| inProgressCompactions *prometheus.GaugeVec | ||
| oldestPartitionGroupOffset *prometheus.GaugeVec | ||
| } | ||
|
|
||
| func NewBlocksCleaner( | ||
| cfg BlocksCleanerConfig, | ||
| bucketClient objstore.InstrumentedBucket, | ||
| usersScanner *cortex_tsdb.UsersScanner, | ||
| compactionVisitMarkerTimeout time.Duration, | ||
| cfgProvider ConfigProvider, | ||
| logger log.Logger, | ||
| ringLifecyclerID string, | ||
| reg prometheus.Registerer, | ||
| cleanerVisitMarkerTimeout time.Duration, | ||
| cleanerVisitMarkerFileUpdateInterval time.Duration, | ||
| blocksMarkedForDeletion *prometheus.CounterVec, | ||
| remainingPlannedCompactions *prometheus.GaugeVec, | ||
| ) *BlocksCleaner { | ||
|
|
||
| var inProgressCompactions *prometheus.GaugeVec | ||
| var oldestPartitionGroupOffset *prometheus.GaugeVec | ||
| if cfg.ShardingStrategy == util.ShardingStrategyShuffle && cfg.CompactionStrategy == util.CompactionStrategyPartitioning { | ||
| inProgressCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ | ||
| Name: "cortex_compactor_in_progress_compactions", | ||
| Help: "Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy", | ||
| }, commonLabels) | ||
| oldestPartitionGroupOffset = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ | ||
| Name: "cortex_compactor_oldest_partition_offset", | ||
| Help: "Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy", | ||
| }, commonLabels) | ||
| } | ||
|
|
||
| c := &BlocksCleaner{ | ||
| cfg: cfg, | ||
| bucketClient: bucketClient, | ||
| usersScanner: usersScanner, | ||
| compactionVisitMarkerTimeout: compactionVisitMarkerTimeout, | ||
| cfgProvider: cfgProvider, | ||
| logger: log.With(logger, "component", "cleaner"), | ||
| ringLifecyclerID: ringLifecyclerID, | ||
|
|
@@ -153,6 +177,9 @@ func NewBlocksCleaner( | |
| Name: "cortex_bucket_clean_duration_seconds", | ||
| Help: "Duration of cleaner runtime for a tenant in seconds", | ||
| }, commonLabels), | ||
| remainingPlannedCompactions: remainingPlannedCompactions, | ||
| inProgressCompactions: inProgressCompactions, | ||
| oldestPartitionGroupOffset: oldestPartitionGroupOffset, | ||
| } | ||
|
|
||
| c.Service = services.NewBasicService(c.starting, c.loop, nil) | ||
|
|
@@ -327,6 +354,13 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro | |
| c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) | ||
| c.tenantPartialBlocks.DeleteLabelValues(userID) | ||
| c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) | ||
| if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle { | ||
| c.remainingPlannedCompactions.DeleteLabelValues(userID) | ||
| if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { | ||
| c.inProgressCompactions.DeleteLabelValues(userID) | ||
| c.oldestPartitionGroupOffset.DeleteLabelValues(userID) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| c.lastOwnedUsers = allUsers | ||
|
|
@@ -447,6 +481,15 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog | |
| level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted) | ||
| } | ||
|
|
||
| if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { | ||
| // Clean up partitioned group info files | ||
| if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil { | ||
| return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory) | ||
| } else if deleted > 0 { | ||
| level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted) | ||
| } | ||
| } | ||
|
|
||
| if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil { | ||
| return errors.Wrap(err, "failed to delete marker files") | ||
| } else if deleted > 0 { | ||
|
|
@@ -592,6 +635,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us | |
| } | ||
| level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) | ||
|
|
||
| if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { | ||
| begin = time.Now() | ||
| c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID) | ||
| level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) | ||
| } | ||
|
|
||
| c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) | ||
| c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) | ||
| c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) | ||
|
|
@@ -600,6 +649,90 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us | |
| return nil | ||
| } | ||
|
|
||
| func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) { | ||
| deletePartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct { | ||
| path string | ||
| status PartitionedGroupStatus | ||
| }) | ||
| err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error { | ||
| if strings.Contains(file, PartitionVisitMarkerDirectory) { | ||
| return nil | ||
| } | ||
| partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file) | ||
| if err != nil { | ||
| level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file) | ||
| return nil | ||
| } | ||
|
|
||
| status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger) | ||
| level.Info(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String()) | ||
| deletePartitionedGroupInfo[partitionedGroupInfo] = struct { | ||
| path string | ||
| status PartitionedGroupStatus | ||
| }{ | ||
| path: file, | ||
| status: status, | ||
| } | ||
| return nil | ||
| }) | ||
|
|
||
| if err != nil { | ||
| level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see we return any error in the iter function above. Is this error mainly failure of listing files in the bucket?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. In this code, we try to get as many as partition group info files. If there is any issue inside iter function, we just stop and processing what we got. Any retriable error would be covered by next cleaner cycle. |
||
| } | ||
|
|
||
| remainingCompactions := 0 | ||
| inProgressCompactions := 0 | ||
| var oldestPartitionGroup *PartitionedGroupInfo | ||
| defer func() { | ||
| c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions)) | ||
| c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions)) | ||
| if c.oldestPartitionGroupOffset != nil { | ||
| if oldestPartitionGroup != nil { | ||
| c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime)) | ||
| level.Info(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime) | ||
| } else { | ||
| c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0) | ||
| } | ||
| } | ||
| }() | ||
| for partitionedGroupInfo, extraInfo := range deletePartitionedGroupInfo { | ||
| partitionedGroupInfoFile := extraInfo.path | ||
|
|
||
| remainingCompactions += extraInfo.status.PendingPartitions | ||
| inProgressCompactions += extraInfo.status.InProgressPartitions | ||
| if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime { | ||
| oldestPartitionGroup = partitionedGroupInfo | ||
| } | ||
| if extraInfo.status.CanDelete { | ||
| if extraInfo.status.IsCompleted { | ||
| // Try to remove all blocks included in partitioned group info | ||
| if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil { | ||
| level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) | ||
| // if one block can not be marked for deletion, we should | ||
| // skip delete this partitioned group. next iteration | ||
| // would try it again. | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { | ||
| level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) | ||
| } else { | ||
| level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) | ||
| } | ||
| } | ||
|
|
||
| if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker { | ||
| // Remove partition visit markers | ||
| if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger); err != nil { | ||
| level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) | ||
| } else { | ||
| level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map | ||
| // and index are updated accordingly. | ||
| func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.