diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b541aa3aa95..b3b13ba78b0 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -776,6 +776,12 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] + # When idle compacting the TSDB head, only compact samples older than this + # age, allowing newer samples to still be ingested. 0 means all samples are + # compacted. + # CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age + [head_compaction_idle_min_age: | default = 0s] + # The write buffer size used by the head chunks mapper. Lower values reduce # memory utilisation on clusters with a large number of tenants at the cost # of increased disk I/O operations. diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index d8b59e7d9ad..57fa41a9b58 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -823,6 +823,12 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] + # When idle compacting the TSDB head, only compact samples older than this + # age, allowing newer samples to still be ingested. 0 means all samples are + # compacted. + # CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age + [head_compaction_idle_min_age: | default = 0s] + # The write buffer size used by the head chunks mapper. Lower values reduce # memory utilisation on clusters with a large number of tenants at the cost # of increased disk I/O operations. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4cc5002f69e..231faface1e 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4389,6 +4389,12 @@ tsdb: # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] + # When idle compacting the TSDB head, only compact samples older than this + # age, allowing newer samples to still be ingested. 0 means all samples are + # compacted. + # CLI flag: -blocks-storage.tsdb.head-compaction-idle-min-age + [head_compaction_idle_min_age: | default = 0s] + # The write buffer size used by the head chunks mapper. Lower values reduce # memory utilisation on clusters with a large number of tenants at the cost of # increased disk I/O operations. diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 6437d894273..427ea9beb5d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -159,7 +159,16 @@ func (u *userTSDB) casState(from, to tsdbState) bool { } // compactHead compacts the Head block at specified block durations avoiding a single huge block. -func (u *userTSDB) compactHead(blockDuration int64) error { +// The minimum age parameter dictates how old a sample must be before it is compacted. Passing +// zero for this parameter, means that all samples are compacted. +func (u *userTSDB) compactHead(blockDuration int64, sampleMinAge time.Duration) error { + + // Determine the maximum sample timestamp to compact. + compactMaxTime := time.Now().Add(-sampleMinAge).UnixNano() / 1000000 + + // TODO: To enable queries whilst compacting, store the timestamp + // computed above. Samples newer than this can be admitted. + if !u.casState(active, forceCompacting) { return errors.New("TSDB head cannot be compacted because it is not in active state (possibly being closed or blocks shipping in progress)") } @@ -174,10 +183,20 @@ func (u *userTSDB) compactHead(blockDuration int64) error { minTime, maxTime := h.MinTime(), h.MaxTime() + // Helper to enforce sampleMinAge - we should stop compacting once + // the maximum time of the new block exceeds the allowed timestamp. + isCompactMaxTimeReached := func(t int64) bool { + return sampleMinAge > 0 && t > compactMaxTime + } + for (minTime/blockDuration)*blockDuration != (maxTime/blockDuration)*blockDuration { // Data in Head spans across multiple block ranges, so we break it into blocks here. // Block max time is exclusive, so we do a -1 here. blockMaxTime := ((minTime/blockDuration)+1)*blockDuration - 1 + + if isCompactMaxTimeReached(blockMaxTime) { + return nil + } if err := u.db.CompactHead(tsdb.NewRangeHead(h, minTime, blockMaxTime)); err != nil { return err } @@ -186,6 +205,9 @@ func (u *userTSDB) compactHead(blockDuration int64) error { minTime, maxTime = h.MinTime(), h.MaxTime() } + if isCompactMaxTimeReached(maxTime) { + return nil + } return u.db.CompactHead(tsdb.NewRangeHead(h, minTime, maxTime)) } @@ -1663,12 +1685,17 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { switch { case force: reason = "forced" - err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) + // For client initiated force compaction, do not enforce a + // minimum sample age, therefore always compact the whole head. + // Push attempts will always fail as a result during compaction. + minAge := time.Duration(0) + err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), minAge) case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout): reason = "idle" level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID) - err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) + minAge := i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge + err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds(), minAge) default: reason = "regular" diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index c704d0ed6a1..c9d81eb39b7 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2338,6 +2338,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 1 cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test. cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second // Testing this. + cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge = 2 * time.Second r := prometheus.NewRegistry() @@ -2376,6 +2377,27 @@ func TestIngesterCompactIdleBlock(t *testing.T) { // wait one second -- TSDB is now idle. time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) + // First idle compaction will do nothing because the samples are not old enough. + i.compactBlocks(context.Background(), false) + verifyCompactedHead(t, i, false) + require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` + # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. + # TYPE cortex_ingester_memory_series_created_total counter + cortex_ingester_memory_series_created_total{user="1"} 1 + + # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. + # TYPE cortex_ingester_memory_series_removed_total counter + cortex_ingester_memory_series_removed_total{user="1"} 0 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + `), memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users")) + + // Wait until the sample is old enought to be compacted. + time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleMinAge - cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) + + // Second idle compaction will actually compact the sample. i.compactBlocks(context.Background(), false) verifyCompactedHead(t, i, true) require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 4035b8ec250..7dc3ec056eb 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -122,6 +122,7 @@ type TSDBConfig struct { HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"` HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"` HeadCompactionIdleTimeout time.Duration `yaml:"head_compaction_idle_timeout"` + HeadCompactionIdleMinAge time.Duration `yaml:"head_compaction_idle_min_age"` HeadChunksWriteBufferSize int `yaml:"head_chunks_write_buffer_size_bytes"` StripeSize int `yaml:"stripe_size"` WALCompressionEnabled bool `yaml:"wal_compression_enabled"` @@ -155,6 +156,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.") + f.DurationVar(&cfg.HeadCompactionIdleMinAge, "blocks-storage.tsdb.head-compaction-idle-min-age", 0, "When idle compacting the TSDB head, only compact samples older than this age, allowing newer samples to still be ingested. 0 means all samples are compacted.") f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.") f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")