Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,12 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

# When idle compacting the TSDB head, only compact samples older than this
# age, allowing newer samples to still be ingested. 0 means all samples are
# compacted.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age
[head_compaction_idle_min_age: <duration> | default = 0s]

# The write buffer size used by the head chunks mapper. Lower values reduce
# memory utilisation on clusters with a large number of tenants at the cost
# of increased disk I/O operations.
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,12 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

# When idle compacting the TSDB head, only compact samples older than this
# age, allowing newer samples to still be ingested. 0 means all samples are
# compacted.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age
[head_compaction_idle_min_age: <duration> | default = 0s]

# The write buffer size used by the head chunks mapper. Lower values reduce
# memory utilisation on clusters with a large number of tenants at the cost
# of increased disk I/O operations.
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4389,6 +4389,12 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

# When idle compacting the TSDB head, only compact samples older than this
# age, allowing newer samples to still be ingested. 0 means all samples are
# compacted.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age
[head_compaction_idle_min_age: <duration> | default = 0s]

# The write buffer size used by the head chunks mapper. Lower values reduce
# memory utilisation on clusters with a large number of tenants at the cost of
# increased disk I/O operations.
Expand Down
33 changes: 30 additions & 3 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ func (u *userTSDB) casState(from, to tsdbState) bool {
}

// compactHead compacts the Head block at specified block durations avoiding a single huge block.
func (u *userTSDB) compactHead(blockDuration int64) error {
// The minimum age parameter dictates how old a sample must be before it is compacted. Passing
// zero for this parameter, means that all samples are compacted.
func (u *userTSDB) compactHead(blockDuration int64, sampleMinAge time.Duration) error {

// Determine the maximum sample timestamp to compact.
compactMaxTime := time.Now().Add(-sampleMinAge).UnixNano() / 1000000

// TODO: To enable queries whilst compacting, store the timestamp
// computed above. Samples newer than this can be admitted.

if !u.casState(active, forceCompacting) {
return errors.New("TSDB head cannot be compacted because it is not in active state (possibly being closed or blocks shipping in progress)")
}
Expand All @@ -174,10 +183,20 @@ func (u *userTSDB) compactHead(blockDuration int64) error {

minTime, maxTime := h.MinTime(), h.MaxTime()

// Helper to enforce sampleMinAge - we should stop compacting once
// the maximum time of the new block exceeds the allowed timestamp.
isCompactMaxTimeReached := func(t int64) bool {
return sampleMinAge > 0 && t > compactMaxTime
}

for (minTime/blockDuration)*blockDuration != (maxTime/blockDuration)*blockDuration {
// Data in Head spans across multiple block ranges, so we break it into blocks here.
// Block max time is exclusive, so we do a -1 here.
blockMaxTime := ((minTime/blockDuration)+1)*blockDuration - 1

if isCompactMaxTimeReached(blockMaxTime) {
return nil
}
if err := u.db.CompactHead(tsdb.NewRangeHead(h, minTime, blockMaxTime)); err != nil {
return err
}
Expand All @@ -186,6 +205,9 @@ func (u *userTSDB) compactHead(blockDuration int64) error {
minTime, maxTime = h.MinTime(), h.MaxTime()
}

if isCompactMaxTimeReached(maxTime) {
return nil
}
return u.db.CompactHead(tsdb.NewRangeHead(h, minTime, maxTime))
}

Expand Down Expand Up @@ -1663,12 +1685,17 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
switch {
case force:
reason = "forced"
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
// For client initiated force compaction, do not enforce a
// minimum sample age, therefore always compact the whole head.
// Push attempts will always fail as a result during compaction.
minAge := time.Duration(0)
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), minAge)

case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout):
reason = "idle"
level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
minAge := i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), minAge)

default:
reason = "regular"
Expand Down
22 changes: 22 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2338,6 +2338,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) {
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 1
cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test.
cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second // Testing this.
cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge = 2 * time.Second

r := prometheus.NewRegistry()

Expand Down Expand Up @@ -2376,6 +2377,27 @@ func TestIngesterCompactIdleBlock(t *testing.T) {
// wait one second -- TSDB is now idle.
time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout)

// First idle compaction will do nothing because the samples are not old enough.
i.compactBlocks(context.Background(), false)
verifyCompactedHead(t, i, false)
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="1"} 1

# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="1"} 0

# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users"))

// Wait until the sample is old enought to be compacted.
time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge - cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout)

// Second idle compaction will actually compact the sample.
i.compactBlocks(context.Background(), false)
verifyCompactedHead(t, i, true)
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type TSDBConfig struct {
HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"`
HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"`
HeadCompactionIdleTimeout time.Duration `yaml:"head_compaction_idle_timeout"`
HeadCompactionIdleMinAge time.Duration `yaml:"head_compaction_idle_min_age"`
HeadChunksWriteBufferSize int `yaml:"head_chunks_write_buffer_size_bytes"`
StripeSize int `yaml:"stripe_size"`
WALCompressionEnabled bool `yaml:"wal_compression_enabled"`
Expand Down Expand Up @@ -155,6 +156,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.")
f.DurationVar(&cfg.HeadCompactionIdleMinAge, "blocks-storage.tsdb.head-compaction-idle-min-age", 0, "When idle compacting the TSDB head, only compact samples older than this age, allowing newer samples to still be ingested. 0 means all samples are compacted.")
f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.")
f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.")
f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")
Expand Down