Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
* [ENHANCEMENT] Blocks storage: added `-blocks-storage.s3.region` support to S3 client configuration. #3811
* [ENHANCEMENT] Distributor: Remove cached subrings for inactive users when using shuffle sharding. #3849
* [ENHANCEMENT] Ingester: attempt to prevent idle compaction from happening in concurrent ingesters by introducing a 25% jitter to the configured idle timeout (`-blocks-storage.tsdb.head-compaction-idle-timeout`). #3850
* [ENHANCEMENT] Compactor: cleanup local files for users that are no longer owned by compactor. #3851
* [ENHANCEMENT] Store-gateway: close empty bucket stores, and delete leftover local files for tenats that no longer belong to store-gateway. #3853
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to
# 25% jitter is added to the value to avoid ingesters compacting
# concurrently. 0 means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to
# 25% jitter is added to the value to avoid ingesters compacting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering, what is the reason to leave the jitter percentage unconfigurable to the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't have a strong opinion here, other than following how jitter has typically been added in other places.

I don't think there would be opposition to be being configurable though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Make sense. I agree we can make it configurable when the need arises in a separate PR.

# concurrently. 0 means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4567,7 +4567,9 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to 25%
# jitter is added to the value to avoid ingesters compacting concurrently. 0
# means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
12 changes: 11 additions & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import (
const (
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
errTSDBIngest = "err: %v. timestamp=%s, series=%s" // Using error.Wrap puts the message before the error and if the series is too long, its truncated.

// Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently.
compactionIdleTimeoutJitter = 0.25
)

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -355,6 +358,9 @@ type TSDBState struct {
forceCompactTrigger chan chan<- struct{}
shipTrigger chan chan<- struct{}

// Timeout chosen for idle compactions.
compactionIdleTimeout time.Duration

// Head compactions metrics.
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
Expand Down Expand Up @@ -476,6 +482,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,

i.TSDBState.shipperIngesterID = i.lifecycler.ID

// Apply positive jitter only to ensure that the minimum timeout is adhered to.
i.TSDBState.compactionIdleTimeout = util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout, compactionIdleTimeoutJitter)
level.Info(i.logger).Log("msg", "TSDB idle compaction timeout set", "timeout", i.TSDBState.compactionIdleTimeout)

i.BasicService = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2)
return i, nil
}
Expand Down Expand Up @@ -1666,7 +1676,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
reason = "forced"
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())

case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout):
case i.TSDBState.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.TSDBState.compactionIdleTimeout):
reason = "idle"
level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2373,8 +2373,8 @@ func TestIngesterCompactIdleBlock(t *testing.T) {
cortex_ingester_memory_users 1
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users"))

// wait one second -- TSDB is now idle.
time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout)
// wait one second (plus maximum jitter) -- TSDB is now idle.
time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter)))

i.compactBlocks(context.Background(), false)
verifyCompactedHead(t, i, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. Note that up to 25% jitter is added to the value to avoid ingesters compacting concurrently. 0 means disabled.")
f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.")
f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.")
f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration

return input + time.Duration(jitter)
}

// DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.
func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration? No jitter.
if input == 0 {
return 0
}

variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance)

return input + time.Duration(jitter)
}
14 changes: 14 additions & 0 deletions pkg/util/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func TestDurationWithJitter_ZeroInputDuration(t *testing.T) {
assert.Equal(t, time.Duration(0), DurationWithJitter(time.Duration(0), 0.5))
}

func TestDurationWithPositiveJitter(t *testing.T) {
const numRuns = 1000

for i := 0; i < numRuns; i++ {
actual := DurationWithPositiveJitter(time.Minute, 0.5)
assert.GreaterOrEqual(t, int64(actual), int64(60*time.Second))
assert.LessOrEqual(t, int64(actual), int64(90*time.Second))
}
}

func TestDurationWithPositiveJitter_ZeroInputDuration(t *testing.T) {
assert.Equal(t, time.Duration(0), DurationWithPositiveJitter(time.Duration(0), 0.5))
}

func TestParseTime(t *testing.T) {
var tests = []struct {
input string
Expand Down