diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e078d282e4..2ae087cbc19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ * [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299 * [BUGFIX] Fixed Gossip memberlist members joining when addresses are configured using DNS-based service discovery. #3360 * [BUGFIX] Ingester: fail to start an ingester running the blocks storage, if unable to load any existing TSDB at startup. #3354 +* [BUGFIX] Blocks storage: Avoid deletion of blocks in the ingester which are not shipped to the storage yet. #3346 ## 1.4.0 / 2020-10-02 diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 249361a88c0..1851fd79500 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -110,6 +111,32 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) { } } +// blocksToDelete filters the input blocks and returns the blocks which are safe to be deleted from the ingester. +func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + if u.DB == nil { + return nil + } + deletable := tsdb.DefaultBlocksToDelete(u.DB)(blocks) + if u.shipper == nil { + return deletable + } + + shipperMeta, err := shipper.ReadMetaFile(u.Dir()) + if err != nil { + // If there is any issue with the shipper, we should be conservative and not delete anything. + level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err) + return nil + } + + result := map[ulid.ULID]struct{}{} + for _, shippedID := range shipperMeta.Uploaded { + if _, ok := deletable[shippedID]; ok { + result[shippedID] = struct{}{} + } + } + return result +} + func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool { lu := u.lastUpdate.Load() @@ -972,6 +999,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize, WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled, SeriesLifecycleCallback: userDB, + BlocksToDelete: userDB.blocksToDelete, }) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index aa59263a2e0..8e7a9c57089 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -27,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -2213,3 +2215,87 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) { db = i.getTSDB(userID) require.Nil(t, db) } + +func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { + chunkRange := 2 * time.Hour + chunkRangeMilliSec := chunkRange.Milliseconds() + cfg := defaultIngesterTestConfig() + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange} + cfg.BlocksStorageConfig.TSDB.Retention = time.Millisecond // Which means delete all but first block. + cfg.LifecyclerConfig.JoinAfter = 0 + + // Create ingester + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push some data to create 3 blocks. + ctx := user.InjectOrgID(context.Background(), userID) + for j := int64(0); j < 5; j++ { + req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + _, err := i.v2Push(ctx, req) + require.NoError(t, err) + } + + db := i.getTSDB(userID) + require.NotNil(t, db) + require.Nil(t, db.Compact()) + + oldBlocks := db.Blocks() + require.Equal(t, 3, len(oldBlocks)) + + // Saying that we have shipped the second block, so only that should get deleted. + require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID}, + })) + + // Add more samples that could trigger another compaction and hence reload of blocks. + for j := int64(5); j < 6; j++ { + req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + _, err := i.v2Push(ctx, req) + require.NoError(t, err) + } + require.Nil(t, db.Compact()) + + // Only the second block should be gone along with a new block. + newBlocks := db.Blocks() + require.Equal(t, 3, len(newBlocks)) + require.Equal(t, oldBlocks[0].Meta().ULID, newBlocks[0].Meta().ULID) // First block remains same. + require.Equal(t, oldBlocks[2].Meta().ULID, newBlocks[1].Meta().ULID) // 3rd block becomes 2nd now. + require.NotEqual(t, oldBlocks[1].Meta().ULID, newBlocks[2].Meta().ULID) // The new block won't match previous 2nd block. + + // Shipping 2 more blocks, hence all the blocks from first round. + require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID}, + })) + + // Add more samples that could trigger another compaction and hence reload of blocks. + for j := int64(6); j < 7; j++ { + req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + _, err := i.v2Push(ctx, req) + require.NoError(t, err) + } + require.Nil(t, db.Compact()) + + // All blocks from the old blocks should be gone now. + newBlocks2 := db.Blocks() + require.Equal(t, 2, len(newBlocks2)) + + require.Equal(t, newBlocks[2].Meta().ULID, newBlocks2[0].Meta().ULID) // Block created in last round. + for _, b := range oldBlocks { + // Second block is not one among old blocks. + require.NotEqual(t, b.Meta().ULID, newBlocks2[1].Meta().ULID) + } +}