Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299
* [BUGFIX] Fixed Gossip memberlist members joining when addresses are configured using DNS-based service discovery. #3360
* [BUGFIX] Ingester: fail to start an ingester running the blocks storage, if unable to load any existing TSDB at startup. #3354
* [BUGFIX] Blocks storage: Avoid deletion of blocks in the ingester which are not shipped to the storage yet. #3346

## 1.4.0 / 2020-10-02

Expand Down
28 changes: 28 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -110,6 +111,32 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
}
}

// blocksToDelete filters the input blocks and returns the blocks which are safe to be deleted from the ingester.
func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
if u.DB == nil {
return nil
}
deletable := tsdb.DefaultBlocksToDelete(u.DB)(blocks)
if u.shipper == nil {
return deletable
}

shipperMeta, err := shipper.ReadMetaFile(u.Dir())
if err != nil {
// If there is any issue with the shipper, we should be conservative and not delete anything.
level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err)
return nil
Comment on lines +126 to +128
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a metric for this to alert on? Because if this issue persists, queries could skip some data and the disk can run out of space.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our alerts we do alert on disk running out of space. I'd say that + error message is enough.

}

result := map[ulid.ULID]struct{}{}
for _, shippedID := range shipperMeta.Uploaded {
if _, ok := deletable[shippedID]; ok {
result[shippedID] = struct{}{}
}
}
return result
}

func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool {
lu := u.lastUpdate.Load()

Expand Down Expand Up @@ -972,6 +999,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize,
WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled,
SeriesLifecycleCallback: userDB,
BlocksToDelete: userDB.blocksToDelete,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
86 changes: 86 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -2213,3 +2215,87 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) {
db = i.getTSDB(userID)
require.Nil(t, db)
}

func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
chunkRange := 2 * time.Hour
chunkRangeMilliSec := chunkRange.Milliseconds()
cfg := defaultIngesterTestConfig()
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{chunkRange}
cfg.BlocksStorageConfig.TSDB.Retention = time.Millisecond // Which means delete all but first block.
cfg.LifecyclerConfig.JoinAfter = 0

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
require.NoError(t, err)
t.Cleanup(cleanup)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's ACTIVE
test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push some data to create 3 blocks.
ctx := user.InjectOrgID(context.Background(), userID)
for j := int64(0); j < 5; j++ {
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

db := i.getTSDB(userID)
require.NotNil(t, db)
require.Nil(t, db.Compact())

oldBlocks := db.Blocks()
require.Equal(t, 3, len(oldBlocks))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
}))

// Add more samples that could trigger another compaction and hence reload of blocks.
for j := int64(5); j < 6; j++ {
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())

// Only the second block should be gone along with a new block.
newBlocks := db.Blocks()
require.Equal(t, 3, len(newBlocks))
require.Equal(t, oldBlocks[0].Meta().ULID, newBlocks[0].Meta().ULID) // First block remains same.
require.Equal(t, oldBlocks[2].Meta().ULID, newBlocks[1].Meta().ULID) // 3rd block becomes 2nd now.
require.NotEqual(t, oldBlocks[1].Meta().ULID, newBlocks[2].Meta().ULID) // The new block won't match previous 2nd block.

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
}))

// Add more samples that could trigger another compaction and hence reload of blocks.
for j := int64(6); j < 7; j++ {
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec)
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}
require.Nil(t, db.Compact())

// All blocks from the old blocks should be gone now.
newBlocks2 := db.Blocks()
require.Equal(t, 2, len(newBlocks2))

require.Equal(t, newBlocks[2].Meta().ULID, newBlocks2[0].Meta().ULID) // Block created in last round.
for _, b := range oldBlocks {
// Second block is not one among old blocks.
require.NotEqual(t, b.Meta().ULID, newBlocks2[1].Meta().ULID)
}
}