Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
* [BUGFIX] Use a valid grpc header when logging IP addresses. #3307
* [BUGFIX] Fixed the metric `cortex_prometheus_rule_group_duration_seconds` in the Ruler, it wouldn't report any values. #3310
* [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314
* [BUGFIX] Block Storage: Avoid deletion of blocks which are not shipped. #3346

## 1.4.0 / 2020-10-02

Expand Down
33 changes: 33 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -109,6 +110,37 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
}
}

// blocksToDelete returns the blocks which have been shipped.
func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
if u.DB == nil {
return map[ulid.ULID]struct{}{}
}
deletable := tsdb.DefaultBlocksToDelete(u.DB)(blocks)
if u.shipper == nil {
return deletable
}

shipperMeta, err := shipper.ReadMetaFile(u.Dir())
if err != nil {
// If there is any issue with the shipper, we should be conservative and not delete anything.
level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err)
return map[ulid.ULID]struct{}{}
}

Outer:
for id := range deletable {
for _, shippedID := range shipperMeta.Uploaded {
if shippedID == id {
continue Outer
}
}
// Not shipped yet.
delete(deletable, id)
}

return deletable
}

func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool {
lu := u.lastUpdate.Load()

Expand Down Expand Up @@ -965,6 +997,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize,
WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled,
SeriesLifecycleCallback: userDB,
BlocksToDelete: userDB.blocksToDelete,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down