Skip to content

Commit

Permalink
Cleaner should not put deletion marker for blocks with no-compact mar…
Browse files Browse the repository at this point in the history
…ker (#6576)

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Feb 8, 2025
1 parent caaddfb commit 5b53a48
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
8 changes: 7 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,8 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
startTime := ts(-10)
endTime := ts(-8)
block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)
block2 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)
createNoCompactionMark(t, bucketClient, userID, block2)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
Expand Down Expand Up @@ -862,7 +864,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
Partitions: []Partition{
{
PartitionID: 0,
Blocks: []ulid.ULID{block1},
Blocks: []ulid.ULID{block1, block2},
},
},
RangeStart: startTime,
Expand Down Expand Up @@ -893,6 +895,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
require.NoError(t, err)
require.True(t, block1DeletionMarkerExists)

block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
require.False(t, block2DeletionMarkerExists)

}

type mockConfigProvider struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/partitioned_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, use
level.Info(userLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount)
}()
for _, blockID := range blocks {
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) {
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) {
if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
return err
Expand Down

0 comments on commit 5b53a48

Please sign in to comment.