Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [ENHANCEMENT] Blocks storage: enabled caching of `meta.json` attributes, configurable via `-blocks-storage.bucket-store.metadata-cache.metafile-attributes-ttl`. #3528
* [ENHANCEMENT] Compactor: added a config validation check to fail fast if the compactor has been configured invalid block range periods (each period is expected to be a multiple of the previous one). #3534
* [ENHANCEMENT] Blocks storage: concurrently fetch deletion marks from object storage. #3538
* [ENHANCEMENT] Blocks storage ingester: ingester can now close idle TSDB and delete local data. #3491
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
9 changes: 9 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,15 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# If TSDB has not received any data for this duration, and all blocks from
# TSDB have been shipped, TSDB is closed and deleted from local disk. If set
# to positive value, this value should be equal or higher than
# -querier.query-ingesters-within flag to make sure that TSDB is not closed
# prematurely, which could cause partial query results. 0 or negative value
# disables closing of idle TSDB.
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
9 changes: 9 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,15 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# If TSDB has not received any data for this duration, and all blocks from
# TSDB have been shipped, TSDB is closed and deleted from local disk. If set
# to positive value, this value should be equal or higher than
# -querier.query-ingesters-within flag to make sure that TSDB is not closed
# prematurely, which could cause partial query results. 0 or negative value
# disables closing of idle TSDB.
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3855,6 +3855,15 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# If TSDB has not received any data for this duration, and all blocks from
# TSDB have been shipped, TSDB is closed and deleted from local disk. If set
# to positive value, this value should be equal or higher than
# -querier.query-ingesters-within flag to make sure that TSDB is not closed
# prematurely, which could cause partial query results. 0 or negative value
# disables closing of idle TSDB.
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
232 changes: 211 additions & 21 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ type Shipper interface {
Sync(ctx context.Context) (uploaded int, err error)
}

type tsdbState int

const (
active tsdbState = iota // Pushes are allowed only in this state.
forceCompacting // TSDB is being force-compacted.
closing // Used while closing idle TSDB.
closed // Used to avoid setting closing back to active in closeAndDeleteIdleUsers method.
)

// Describes result of TSDB idle check (for closing). String is used as metric label.
type tsdbIdleCheckResult string

const (
tsdbIdle tsdbIdleCheckResult = "idle" // Not reported via metrics. Metrics use tsdbIdleClosed on success.
tsdbShippingDisabled tsdbIdleCheckResult = "shipping_disabled"
tsdbNotIdle tsdbIdleCheckResult = "not_idle"
tsdbNotCompacted tsdbIdleCheckResult = "not_compacted"
tsdbNotShipped tsdbIdleCheckResult = "not_shipped"
tsdbCheckFailed tsdbIdleCheckResult = "check_failed"
tsdbCloseFailed tsdbIdleCheckResult = "close_failed"
tsdbNotActive tsdbIdleCheckResult = "not_active"
tsdbDataRemovalFailed tsdbIdleCheckResult = "data_removal_failed"
tsdbIdleClosed tsdbIdleCheckResult = "idle_closed" // Success.
)

type userTSDB struct {
db *tsdb.DB
userID string
Expand All @@ -56,13 +81,12 @@ type userTSDB struct {
seriesInMetric *metricCounter
limiter *Limiter

forcedCompactionInProgressMtx sync.RWMutex
forcedCompactionInProgress bool

pushesInFlight sync.WaitGroup
stateMtx sync.RWMutex
state tsdbState
pushesInFlight sync.WaitGroup // Increased with Read lock held, only if state == active.

// Used to detect idle TSDBs.
lastUpdate *atomic.Int64
lastUpdate atomic.Int64

// Thanos shipper used to ship blocks to the storage.
shipper Shipper
Expand Down Expand Up @@ -102,16 +126,25 @@ func (u *userTSDB) StartTime() (int64, error) {
return u.db.StartTime()
}

func (u *userTSDB) casState(from, to tsdbState) bool {
u.stateMtx.Lock()
defer u.stateMtx.Unlock()

if u.state != from {
return false
}
u.state = to
return true
}

// compactHead compacts the Head block at specified block durations avoiding a single huge block.
func (u *userTSDB) compactHead(blockDuration int64) error {
u.forcedCompactionInProgressMtx.Lock()
u.forcedCompactionInProgress = true
u.forcedCompactionInProgressMtx.Unlock()
defer func() {
u.forcedCompactionInProgressMtx.Lock()
u.forcedCompactionInProgress = false
u.forcedCompactionInProgressMtx.Unlock()
}()
if !u.casState(active, forceCompacting) {
return errors.New("TSDB head cannot be compacted because it is not in active state (possibly being closed)")
}

defer u.casState(forceCompacting, active)

// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
u.pushesInFlight.Wait()
Expand Down Expand Up @@ -190,22 +223,31 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
return deletable
}

shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
shippedBlocks, err := u.getShippedBlocks()
if err != nil {
// If there is any issue with the shipper, we should be conservative and not delete anything.
level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err)
return nil
}

result := map[ulid.ULID]struct{}{}
for _, shippedID := range shipperMeta.Uploaded {
for _, shippedID := range shippedBlocks {
if _, ok := deletable[shippedID]; ok {
result[shippedID] = struct{}{}
}
}
return result
}

func (u *userTSDB) getShippedBlocks() ([]ulid.ULID, error) {
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
if err != nil {
return nil, err
}

return shipperMeta.Uploaded, nil
}

func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool {
lu := u.lastUpdate.Load()

Expand All @@ -216,6 +258,37 @@ func (u *userTSDB) setLastUpdate(t time.Time) {
u.lastUpdate.Store(t.Unix())
}

// Reports tsdbIdle if TSDB can be closed, or some other tsdbIdleCheckResult.
func (u *userTSDB) canIdleClose(idleTimeout time.Duration) (tsdbIdleCheckResult, error) {
if !u.isIdle(time.Now(), idleTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised this check has a problem: the lastUpdate is not correct after an ingester restart. An option could be checking the timestamp against max(lastUpdate, TSDB max time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TSDB is opened, we set lastUpdate to current time to avoid premature idle flush and close. This will only be a problem if ingesters never stay up for longer than configured idle timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of current time, we can set it to last sample time from TSDB when opening, as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TSDB is opened, we set lastUpdate to current time to avoid premature idle flush and close.

Then we're protected 👍

Instead of current time, we can set it to last sample time from TSDB when opening, as you suggest.

Could make sense.

return tsdbNotIdle, nil
}

// If head is not compacted, we cannot close this yet.
if u.Head().NumSeries() > 0 {
return tsdbNotCompacted, nil
}

// Verify that all blocks have been shipped.
shipped, err := u.getShippedBlocks()
if err != nil {
return tsdbCheckFailed, errors.Wrapf(err, "failed to read shipper meta")
}

shippedMap := make(map[ulid.ULID]bool, len(shipped))
for _, b := range shipped {
shippedMap[b] = true
}

for _, b := range u.Blocks() {
if !shippedMap[b.Meta().ULID] {
return tsdbNotShipped, nil
}
}

return tsdbIdle, nil
}

// TSDBState holds data structures used by the TSDB storage engine
type TSDBState struct {
dbs map[string]*userTSDB // tsdb sharded by userID
Expand All @@ -238,9 +311,28 @@ type TSDBState struct {
appenderAddDuration prometheus.Histogram
appenderCommitDuration prometheus.Histogram
refCachePurgeDuration prometheus.Histogram

// Idle TSDB metrics.
idleTsdbChecks prometheus.Counter
idleTsdbCheckResult *prometheus.CounterVec
}

func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState {
idleTsdbCheckResult := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_idle_tsdb_check_results_total",
Help: "The total number of various results for idle TSDB checks.",
}, []string{"result"})

idleTsdbCheckResult.WithLabelValues(string(tsdbShippingDisabled))
idleTsdbCheckResult.WithLabelValues(string(tsdbNotIdle))
idleTsdbCheckResult.WithLabelValues(string(tsdbNotCompacted))
idleTsdbCheckResult.WithLabelValues(string(tsdbNotShipped))
idleTsdbCheckResult.WithLabelValues(string(tsdbCheckFailed))
idleTsdbCheckResult.WithLabelValues(string(tsdbCloseFailed))
idleTsdbCheckResult.WithLabelValues(string(tsdbNotActive))
idleTsdbCheckResult.WithLabelValues(string(tsdbDataRemovalFailed))
idleTsdbCheckResult.WithLabelValues(string(tsdbIdleClosed))

return TSDBState{
dbs: make(map[string]*userTSDB),
bucket: bucketClient,
Expand Down Expand Up @@ -277,6 +369,12 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
Help: "The total time it takes to purge the TSDB series reference cache for a single tenant.",
Buckets: prometheus.DefBuckets,
}),

idleTsdbChecks: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_idle_tsdb_checks_total",
Help: "The total number of checks for idle TSDB.",
}),
idleTsdbCheckResult: idleTsdbCheckResult,
}
}

Expand Down Expand Up @@ -394,6 +492,15 @@ func (i *Ingester) startingV2(ctx context.Context) error {
servs = append(servs, shippingService)
}

if i.cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout > 0 {
interval := i.cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBInterval
if interval == 0 {
interval = cortex_tsdb.DefaultCloseIdleTSDBInterval
}
closeIdleService := services.NewTimerService(interval, nil, i.closeAndDeleteIdleUserTSDBs, nil)
servs = append(servs, closeIdleService)
}

var err error
i.TSDBState.subservices, err = services.NewManager(servs...)
if err == nil {
Expand Down Expand Up @@ -680,11 +787,18 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
}

func (u *userTSDB) acquireAppendLock() error {
u.forcedCompactionInProgressMtx.RLock()
defer u.forcedCompactionInProgressMtx.RUnlock()

if u.forcedCompactionInProgress {
return errors.New("forced compaction in progress")
u.stateMtx.RLock()
defer u.stateMtx.RUnlock()

if u.state != active {
switch u.state {
case forceCompacting:
return errors.New("forced compaction in progress")
case closing:
return errors.New("TSDB is closing")
default:
return errors.New("TSDB is not active")
}
}

u.pushesInFlight.Add(1)
Expand Down Expand Up @@ -1090,7 +1204,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
lastUpdate: atomic.NewInt64(0),
}

// Create a new user database
Expand Down Expand Up @@ -1438,6 +1551,83 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
})
}

func (i *Ingester) closeAndDeleteIdleUserTSDBs(ctx context.Context) error {
for _, userID := range i.getTSDBUsers() {
if ctx.Err() != nil {
return nil
}

i.TSDBState.idleTsdbChecks.Inc()

result := i.closeAndDeleteUserTSDBIfIdle(userID)

i.TSDBState.idleTsdbCheckResult.WithLabelValues(string(result)).Inc()
}

return nil
}

func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbIdleCheckResult {
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
// We will not delete local data when not using shipping to storage.
return tsdbShippingDisabled
}

if result, err := userDB.canIdleClose(i.cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout); result != tsdbIdle {
if err != nil {
level.Error(util.Logger).Log("msg", "cannot close idle TSDB", "user", userID, "err", err)
}
return result
}

// This disables pushes and force-compactions.
if !userDB.casState(active, closing) {
return tsdbNotActive
}

// If TSDB is fully closed, we will set state to 'closed', which will prevent this defered closing -> active transition.
defer userDB.casState(closing, active)

// Make sure we don't ignore any possible inflight pushes.
userDB.pushesInFlight.Wait()

// Verify again, things may have changed during the checks and pushes.
if result, err := userDB.canIdleClose(i.cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout); result != tsdbIdle {
if err != nil {
level.Error(util.Logger).Log("msg", "cannot close idle TSDB", "user", userID, "err", err)
}
return result
}

dir := userDB.db.Dir()

if err := userDB.Close(); err != nil {
level.Error(util.Logger).Log("msg", "failed to close idle TSDB", "user", userID, "err", err)
return tsdbCloseFailed
}

level.Info(util.Logger).Log("msg", "closed idle TSDB", "user", userID)

// This will prevent going back to "active" state in deferred statement.
userDB.casState(closing, closed)

i.userStatesMtx.Lock()
delete(i.TSDBState.dbs, userID)
i.userStatesMtx.Unlock()

i.TSDBState.tsdbMetrics.removeRegistryForUser(userID)

// And delete local data.
if err := os.RemoveAll(dir); err != nil {
level.Error(util.Logger).Log("msg", "failed to delete idle TSDB", "user", userID, "err", err)
return tsdbDataRemovalFailed
}

level.Info(util.Logger).Log("msg", "deleted data for idle TSDB", "user", userID, "dir", dir)
return tsdbIdleClosed
}

// This method will flush all data. It is called as part of Lifecycler's shutdown (if flush on shutdown is configured), or from the flusher.
//
// When called as during Lifecycler shutdown, this happens as part of normal Ingester shutdown (see stoppingV2 method).
Expand Down
Loading