Skip to content

Commit

Permalink
Cleaner would delete bucket index when there is no block in bucket st…
Browse files Browse the repository at this point in the history
…ore (#6577)

* Cleaner would delete bucket index and sync status file if there is no block in bucket store

Signed-off-by: Alex Le <[email protected]>

* fix test

Signed-off-by: Alex Le <[email protected]>

* updated CHANGELOG

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Feb 11, 2025
1 parent 5b53a48 commit d8cb7f1
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
* [BUGFIX] Compactor: Cleaner would delete bucket index when there is no block in bucket store. #6577

## 1.19.0 in progress

Expand Down
29 changes: 23 additions & 6 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
c.blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)
startTime := time.Now()

bucketIndexDeleted := false

level.Info(userLogger).Log("msg", "started blocks cleanup and maintenance")
defer func() {
if returnErr != nil {
Expand Down Expand Up @@ -540,7 +542,14 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)

defer func() {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
if bucketIndexDeleted {
level.Info(userLogger).Log("msg", "deleting bucket index sync status since bucket index is empty")
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
level.Warn(userLogger).Log("msg", "error deleting index sync status when index is empty", "err", err)
}
} else {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
}
}()

if errors.Is(err, bucketindex.ErrIndexCorrupted) {
Expand Down Expand Up @@ -628,12 +637,20 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
level.Info(userLogger).Log("msg", "finish cleaning partial blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}

// Upload the updated index to the storage.
begin = time.Now()
if err := bucketindex.WriteIndex(ctx, c.bucketClient, userID, c.cfgProvider, idx); err != nil {
return err
if idx.IsEmpty() && len(partials) == 0 {
level.Info(userLogger).Log("msg", "deleting bucket index since it is empty")
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
return err
}
bucketIndexDeleted = true
} else {
// Upload the updated index to the storage.
begin = time.Now()
if err := bucketindex.WriteIndex(ctx, c.bucketClient, userID, c.cfgProvider, idx); err != nil {
return err
}
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
Expand Down
38 changes: 38 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
}
createTSDBBlock(t, bkt, userID, 10, 20, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: deletionDelay,
Expand Down Expand Up @@ -901,6 +902,43 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {

}

func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

userID := "user-1"

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
}

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)

err := cleaner.cleanUser(ctx, logger, userBucket, userID, false)
require.NoError(t, err)

_, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger)
require.ErrorIs(t, err, bucketindex.ErrIndexNotFound)

_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, bucketindex.SyncStatusFile)
require.True(t, userBucket.IsObjNotFoundErr(err))
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/compactor/compactor_paritioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func TestPartitionCompactor_SkipCompactionWhenCmkError(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestPartitionCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASing
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX", userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -436,7 +437,7 @@ func TestPartitionCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) {
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -493,8 +494,8 @@ func TestPartitionCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -767,8 +768,8 @@ func TestPartitionCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testin
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -1028,8 +1029,8 @@ func TestPartitionCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInst
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -1278,7 +1279,7 @@ func TestPartitionCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingE
bucketClient.MockGetRequireUpload(userID+"/partitioned-groups/visit-marks/"+fmt.Sprint(groupHash)+"/partition-0-visit-mark.json", string(visitMarkerFileContent), nil)
bucketClient.MockUpload(userID+"/partitioned-groups/visit-marks/"+fmt.Sprint(groupHash)+"/partition-0-visit-mark.json", nil)
// Iter with recursive so expected to get objects rather than directories.
blockFiles = append(blockFiles, path.Join(userID, blockID, block.MetaFilename))
blockFiles = append(blockFiles, path.Join(userID, blockID), path.Join(userID, blockID, block.MetaFilename))

// Get all of the unique group hashes so that they can be used to ensure all groups were compacted
groupHashes[groupHash]++
Expand Down
21 changes: 11 additions & 10 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -351,7 +352,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX", userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload(userID+"/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -409,7 +410,7 @@ func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) {
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -460,8 +461,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -725,8 +726,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -973,8 +974,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil)
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
Expand Down Expand Up @@ -1210,7 +1211,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
bucketClient.MockGetRequireUpload(userID+"/"+blockID+"/visit-mark.json", string(visitMarkerFileContent), nil)
bucketClient.MockUpload(userID+"/"+blockID+"/visit-mark.json", nil)
// Iter with recursive so expected to get objects rather than directories.
blockFiles = append(blockFiles, path.Join(userID, blockID, block.MetaFilename))
blockFiles = append(blockFiles, path.Join(userID, blockID), path.Join(userID, blockID, block.MetaFilename))

// Get all of the unique group hashes so that they can be used to ensure all groups were compacted
groupHash := hashGroup(userID, blockTimes["startTime"], blockTimes["endTime"])
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/tsdb/bucketindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (idx *Index) RemoveBlock(id ulid.ULID) {
}
}

func (idx *Index) IsEmpty() bool {
return len(idx.Blocks) == 0 && len(idx.BlockDeletionMarks) == 0
}

// Block holds the information about a block in the index.
type Block struct {
// Block ID.
Expand Down

0 comments on commit d8cb7f1

Please sign in to comment.