Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [ENHANCEMENT] Distributor: Prevent failed ingestion from affecting rate limiting. #3825
* [ENHANCEMENT] Blocks storage: added `-blocks-storage.s3.region` support to S3 client configuration. #3811
* [ENHANCEMENT] Distributor: Remove cached subrings for inactive users when using shuffle sharding. #3849
* [ENHANCEMENT] Ingester: attempt to prevent idle compaction from happening in concurrent ingesters by introducing a 25% jitter to the configured idle timeout (`-blocks-storage.tsdb.head-compaction-idle-timeout`). #3850
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to
# 25% jitter is added to the value to avoid ingesters compacting
# concurrently. 0 means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
4 changes: 3 additions & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,9 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to
# 25% jitter is added to the value to avoid ingesters compacting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just wondering, what is the reason to leave the jitter percentage unconfigurable to the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't have a strong opinion here, other than following how jitter has typically been added in other places.

I don't think there would be opposition to be being configurable though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Make sense. I agree we can make it configurable when the need arises in a separate PR.

# concurrently. 0 means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4363,7 +4363,9 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# If TSDB head is idle for this duration, it is compacted. Note that up to 25%
# jitter is added to the value to avoid ingesters compacting concurrently. 0
# means disabled.
# CLI flag: -blocks-storage.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

Expand Down
12 changes: 11 additions & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import (
const (
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
errTSDBIngest = "err: %v. timestamp=%s, series=%s" // Using error.Wrap puts the message before the error and if the series is too long, its truncated.

// Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently.
compactionIdleTimeoutJitter = 0.25
)

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -355,6 +358,9 @@ type TSDBState struct {
forceCompactTrigger chan chan<- struct{}
shipTrigger chan chan<- struct{}

// Timeout chosen for idle compactions.
compactionIdleTimeout time.Duration

// Head compactions metrics.
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
Expand Down Expand Up @@ -476,6 +482,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,

i.TSDBState.shipperIngesterID = i.lifecycler.ID

// Apply positive jitter only to ensure that the minimum timeout is adhered to.
i.TSDBState.compactionIdleTimeout = util.DurationWithPositiveJitter(i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout, compactionIdleTimeoutJitter)
level.Info(i.logger).Log("msg", "TSDB idle compaction timeout set", "timeout", i.TSDBState.compactionIdleTimeout)

i.BasicService = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2)
return i, nil
}
Expand Down Expand Up @@ -1666,7 +1676,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
reason = "forced"
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())

case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout):
case i.TSDBState.compactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.TSDBState.compactionIdleTimeout):
reason = "idle"
level.Info(i.logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2373,8 +2373,8 @@ func TestIngesterCompactIdleBlock(t *testing.T) {
cortex_ingester_memory_users 1
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users"))

// wait one second -- TSDB is now idle.
time.Sleep(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout)
// wait one second (plus maximum jitter) -- TSDB is now idle.
time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter)))

i.compactBlocks(context.Background(), false)
verifyCompactedHead(t, i, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. Note that up to 25% jitter is added to the value to avoid ingesters compacting concurrently. 0 means disabled.")
f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.")
f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.")
f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration

return input + time.Duration(jitter)
}

// DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.
func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration? No jitter.
if input == 0 {
return 0
}

variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance)

return input + time.Duration(jitter)
}
14 changes: 14 additions & 0 deletions pkg/util/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func TestDurationWithJitter_ZeroInputDuration(t *testing.T) {
assert.Equal(t, time.Duration(0), DurationWithJitter(time.Duration(0), 0.5))
}

func TestDurationWithPositiveJitter(t *testing.T) {
const numRuns = 1000

for i := 0; i < numRuns; i++ {
actual := DurationWithPositiveJitter(time.Minute, 0.5)
assert.GreaterOrEqual(t, int64(actual), int64(60*time.Second))
assert.LessOrEqual(t, int64(actual), int64(90*time.Second))
}
}

func TestDurationWithPositiveJitter_ZeroInputDuration(t *testing.T) {
assert.Equal(t, time.Duration(0), DurationWithPositiveJitter(time.Duration(0), 0.5))
}

func TestParseTime(t *testing.T) {
var tests = []struct {
input string
Expand Down