Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
* [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837
* [CHANGE] Experimental TSDB: compact head when opening TSDB. This should only affect ingester startup after it was unable to compact head in previous run. #2870
* [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783
* [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783
* [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783
Expand Down
8 changes: 8 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,14 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
}
db.DisableCompactions() // we will compact on our own schedule

// Run compaction before using this TSDB. If there is data in head that needs to be put into blocks,
// this will actually create the blocks. If there is no data (empty TSDB), this is a no-op.
// We don't do blocks compaction.
err = db.Compact()
if err != nil {
return nil, err
}

userDB.DB = db
// We set the limiter here because we don't want to limit
// series during WAL replay.
Expand Down
74 changes: 74 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1602,3 +1603,76 @@ func pushSingleSample(t *testing.T, i *Ingester) {
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

func TestHeadCompactionOnStartup(t *testing.T) {
// Create a temporary directory for TSDB
tempDir, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
t.Cleanup(func() {
os.RemoveAll(tempDir)
})

// Build TSDB for user, with data covering 24 hours.
{
// Number of full chunks, 12 chunks for 24hrs.
numFullChunks := 12
chunkRange := 2 * time.Hour.Milliseconds()

userDir := filepath.Join(tempDir, userID)
require.NoError(t, os.Mkdir(userDir, 0700))

db, err := tsdb.Open(userDir, nil, nil, &tsdb.Options{
RetentionDuration: int64(time.Hour * 25 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: chunkRange,
MaxBlockDuration: chunkRange,
})
require.NoError(t, err)

db.DisableCompactions()
head := db.Head()

l := labels.Labels{{Name: "n", Value: "v"}}
for i := 0; i < numFullChunks; i++ {
// Not using db.Appender() as it checks for compaction.
app := head.Appender()
_, err := app.Add(l, int64(i)*chunkRange+1, 9.99)
require.NoError(t, err)
_, err = app.Add(l, int64(i+1)*chunkRange, 9.99)
require.NoError(t, err)
require.NoError(t, app.Commit())
}

dur := time.Duration(head.MaxTime()-head.MinTime()) * time.Millisecond
require.True(t, dur > 23*time.Hour)
require.Empty(t, db.Blocks())
require.NoError(t, db.Close())
}

clientCfg := defaultClientTestConfig()
limits := defaultLimitsTestConfig()

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

ingesterCfg := defaultIngesterTestConfig()
ingesterCfg.TSDBEnabled = true
ingesterCfg.TSDBConfig.Dir = tempDir
ingesterCfg.TSDBConfig.Backend = "s3"
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"

ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))

defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck

db := ingester.getTSDB(userID)
require.NotNil(t, db)

h := db.Head()

dur := time.Duration(h.MaxTime()-h.MinTime()) * time.Millisecond
require.True(t, dur < 4*time.Hour)
require.NotEmpty(t, db.Blocks())
}