diff --git a/CHANGELOG.md b/CHANGELOG.md index 89c47cf1692..4bb267062c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ * [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825 * [ENHANCEMENT] Blocks storage: added `-blocks-storage.s3.region` support to S3 client configuration. #3811 * [ENHANCEMENT] Distributor: Remove cached subrings for inactive users when using shuffle sharding. #3849 +* [ENHANCEMENT] Ingester: attempt to prevent idle compaction from happening in concurrent ingesters by introducing a 25% jitter to the configured idle timeout (`-blocks-storage.tsdb.head-compaction-idle-timeout`). #3850 * [ENHANCEMENT] Compactor: cleanup local files for users that are no longer owned by compactor. #3851 * [ENHANCEMENT] Store-gateway: close empty bucket stores, and delete leftover local files for tenats that no longer belong to store-gateway. #3853 * [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index ca728540b52..146bfc3b829 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -767,7 +767,9 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.head-compaction-concurrency [head_compaction_concurrency: | default = 5] - # If TSDB head is idle for this duration, it is compacted. 0 means disabled. + # If TSDB head is idle for this duration, it is compacted. Note that up to + # 25% jitter is added to the value to avoid ingesters compacting + # concurrently. 0 means disabled. # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 71309e889b4..73e94805e31 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -814,7 +814,9 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.head-compaction-concurrency [head_compaction_concurrency: | default = 5] - # If TSDB head is idle for this duration, it is compacted. 0 means disabled. + # If TSDB head is idle for this duration, it is compacted. Note that up to + # 25% jitter is added to the value to avoid ingesters compacting + # concurrently. 0 means disabled. # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 64a46f5824f..e3227cdaa55 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4567,7 +4567,9 @@ tsdb: # CLI flag: -blocks-storage.tsdb.head-compaction-concurrency [head_compaction_concurrency: | default = 5] - # If TSDB head is idle for this duration, it is compacted. 0 means disabled. + # If TSDB head is idle for this duration, it is compacted. Note that up to 25% + # jitter is added to the value to avoid ingesters compacting concurrently. 0 + # means disabled. # CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout [head_compaction_idle_timeout: | default = 1h] diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index e31af7123b1..dcc64e5747c 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -45,6 +45,9 @@ import ( const ( errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)" errTSDBIngest = "err: %v. timestamp=%s, series=%s" // Using error.Wrap puts the message before the error and if the series is too long, its truncated. + + // Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently. + compactionIdleTimeoutJitter = 0.25 ) // Shipper interface is used to have an easy way to mock it in tests. @@ -355,6 +358,9 @@ type TSDBState struct { forceCompactTrigger chan chan<- struct{} shipTrigger chan chan<- struct{} + // Timeout chosen for idle compactions. + compactionIdleTimeout time.Duration + // Head compactions metrics. compactionsTriggered prometheus.Counter compactionsFailed prometheus.Counter @@ -476,6 +482,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.TSDBState.shipperIngesterID = i.lifecycler.ID + // Apply positive jitter only to ensure that the minimum timeout is adhered to. + i.TSDBState.compactionIdleTimeout = util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout, compactionIdleTimeoutJitter) + level.Info(i.logger).Log("msg", "TSDB idle compaction timeout set", "timeout", i.TSDBState.compactionIdleTimeout) + i.BasicService = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2) return i, nil } @@ -1666,7 +1676,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { reason = "forced" err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) - case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout): + case i.TSDBState.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.TSDBState.compactionIdleTimeout): reason = "idle" level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID) err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds()) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 5fe8da08a96..a6aa215642b 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2373,8 +2373,8 @@ func TestIngesterCompactIdleBlock(t *testing.T) { cortex_ingester_memory_users 1 `), memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users")) - // wait one second -- TSDB is now idle. - time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) + // wait one second (plus maximum jitter) -- TSDB is now idle. + time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter))) i.compactBlocks(context.Background(), false) verifyCompactedHead(t, i, true) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 4035b8ec250..7b490995ba2 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -154,7 +154,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") - f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.") + f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. Note that up to 25% jitter is added to the value to avoid ingesters compacting concurrently. 0 means disabled.") f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.") f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") diff --git a/pkg/util/time.go b/pkg/util/time.go index 24054a34224..58f951a8b3d 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -60,3 +60,16 @@ func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration return input + time.Duration(jitter) } + +// DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval. +func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration { + // No duration? No jitter. + if input == 0 { + return 0 + } + + variance := int64(float64(input) * variancePerc) + jitter := rand.Int63n(variance) + + return input + time.Duration(jitter) +} diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index ea10ac74899..700fe55fab5 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -40,6 +40,20 @@ func TestDurationWithJitter_ZeroInputDuration(t *testing.T) { assert.Equal(t, time.Duration(0), DurationWithJitter(time.Duration(0), 0.5)) } +func TestDurationWithPositiveJitter(t *testing.T) { + const numRuns = 1000 + + for i := 0; i < numRuns; i++ { + actual := DurationWithPositiveJitter(time.Minute, 0.5) + assert.GreaterOrEqual(t, int64(actual), int64(60*time.Second)) + assert.LessOrEqual(t, int64(actual), int64(90*time.Second)) + } +} + +func TestDurationWithPositiveJitter_ZeroInputDuration(t *testing.T) { + assert.Equal(t, time.Duration(0), DurationWithPositiveJitter(time.Duration(0), 0.5)) +} + func TestParseTime(t *testing.T) { var tests = []struct { input string