diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb792133dd..bb6c00bdc0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ * [FEATURE] Added readiness probe endpoint`/ready` to queriers. #1934 * [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830 * [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749 +* [FEATURE] EXPERIMENTAL: close stale TSDBs after a time period of not receiving writes. #1958 + * `--experimental.tsdb.max-stale-age` * [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. * [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917 * `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1ada90d16bf..85416760e1e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -43,6 +43,7 @@ type ingesterMetrics struct { queriedSamples prometheus.Histogram queriedSeries prometheus.Histogram queriedChunks prometheus.Histogram + cleanupDuration prometheus.Histogram } func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { @@ -81,6 +82,11 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { // A small number of chunks per series - 10*(8^(7-1)) = 2.6m. Buckets: prometheus.ExponentialBuckets(10, 8, 7), }), + cleanupDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingester_cleanup_duration", + Help: "The time it takes to perform a cleanup operation.", + Buckets: prometheus.DefBuckets, + }), } if r != nil { @@ -92,6 +98,7 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { m.queriedSamples, m.queriedSeries, m.queriedChunks, + m.cleanupDuration, ) } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 11bf7f1eb25..fda3be7fd0b 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -3,6 +3,7 @@ package ingester import ( "fmt" "io" + "io/ioutil" "net/http" "os" "path/filepath" @@ -34,9 +35,28 @@ const ( errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)" ) +const ( + active = iota + closing +) + +// UserTSDB is a users TSDB +type UserTSDB struct { + *tsdb.DB + sync.WaitGroup + + shipperCancel context.CancelFunc + shipperDone *sync.WaitGroup + + firstEmpty time.Time + + // state indicates if the TSDB is currently active or in the process of closing. + state int +} + // TSDBState holds data structures used by the TSDB storage engine type TSDBState struct { - dbs map[string]*tsdb.DB // tsdb sharded by userID + dbs map[string]*UserTSDB // tsdb sharded by userID bucket objstore.Bucket // Keeps count of in-flight requests @@ -63,7 +83,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, quit: make(chan struct{}), TSDBState: TSDBState{ - dbs: make(map[string]*tsdb.DB), + dbs: make(map[string]*UserTSDB), bucket: bucketClient, }, } @@ -85,6 +105,9 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() + // Start a cleanup routine that will close and remove TSDB's that are no longer active + i.startCleanupLoop() + return i, nil } @@ -101,6 +124,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien if err != nil { return nil, wrapWithUser(err, userID) } + defer db.Done() // Ensure the ingester shutdown procedure hasn't started i.userStatesMtx.RLock() @@ -195,6 +219,7 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie if db == nil { return &client.QueryResponse{}, nil } + defer db.Done() q, err := db.Querier(int64(from), int64(through)) if err != nil { @@ -243,6 +268,7 @@ func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesReq if db == nil { return &client.LabelValuesResponse{}, nil } + defer db.Done() through := time.Now() from := through.Add(-i.cfg.TSDBConfig.Retention) @@ -272,6 +298,7 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque if db == nil { return &client.LabelNamesResponse{}, nil } + defer db.Done() through := time.Now() from := through.Add(-i.cfg.TSDBConfig.Retention) @@ -301,6 +328,7 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me if db == nil { return &client.MetricsForLabelMatchersResponse{}, nil } + defer db.Done() // Parse the request from, to, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req) @@ -358,14 +386,20 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me return result, nil } -func (i *Ingester) getTSDB(userID string) *tsdb.DB { +func (i *Ingester) getTSDB(userID string) *UserTSDB { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - db, _ := i.TSDBState.dbs[userID] + db, ok := i.TSDBState.dbs[userID] + if ok { + if db.state == closing { + return nil + } + db.Add(1) + } return db } -func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) { +func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*UserTSDB, error) { db := i.getTSDB(userID) if db != nil { return db, nil @@ -378,6 +412,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) var ok bool db, ok = i.TSDBState.dbs[userID] if ok { + if db.state == closing { + return nil, fmt.Errorf("tsdb is closing") + } + db.Add(1) return db, nil } @@ -391,23 +429,12 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) } - // Create the database and a shipper for a user - db, err := i.createTSDB(userID) - if err != nil { - return nil, err - } - - // Add the db to list of user databases - i.TSDBState.dbs[userID] = db - return db, nil -} - -// createTSDB creates a TSDB for a given userID, and returns the created db. -func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { + db = &UserTSDB{} udir := i.cfg.TSDBConfig.BlocksDir(userID) // Create a new user database - db, err := tsdb.Open(udir, util.Logger, nil, &tsdb.Options{ + var err error + db.DB, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{ RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond), BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(), NoLockfile: true, @@ -415,34 +442,13 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { if err != nil { return nil, err } - - // Thanos shipper requires at least 1 external label to be set. For this reason, - // we set the tenant ID as external label and we'll filter it out when reading - // the series from the storage. - l := labels.Labels{ - { - Name: cortex_tsdb.TenantIDExternalLabel, - Value: userID, - }, - } + // Save the database + i.TSDBState.dbs[userID] = db // Create a new shipper for this database - if i.cfg.TSDBConfig.ShipInterval > 0 { - s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource) - i.done.Add(1) - go func() { - defer i.done.Done() - if err := runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error { - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(util.Logger).Log("err", err, "uploaded", uploaded) - } - return nil - }); err != nil { - level.Warn(util.Logger).Log("err", err) - } - }() - } + i.startShipper(userID) + db.Add(1) return db, nil } @@ -456,7 +462,7 @@ func (i *Ingester) closeAllTSDB() { for userID, db := range i.TSDBState.dbs { userID := userID - go func(db *tsdb.DB) { + go func(db *UserTSDB) { defer wg.Done() if err := db.Close(); err != nil { @@ -523,16 +529,12 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { go func(userID string) { defer wg.Done() defer openGate.Done() - db, err := i.createTSDB(userID) + db, err := i.getOrCreateTSDB(userID, true) if err != nil { level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID) return } - - // Add the database to the map of user databases - i.userStatesMtx.Lock() - i.TSDBState.dbs[userID] = db - i.userStatesMtx.Unlock() + db.Done() }(userID) @@ -548,3 +550,196 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { } return err } + +// closeStaleTSDBs walks the TSDB directory for each user to determine if the TSDB should be closed. +// If +// - The directory is empty of blocks +// OR +// - The directory has every block marked as shipped (if required) +// AND +// - The TSDB head has been empty for the given period +// THEN +// - Close the TSDB, stop the shipper routine, and delete the directory and it's contents. +func (i *Ingester) closeStaleTSDBs(stopc <-chan struct{}) error { + now := time.Now() + defer func() { + i.metrics.cleanupDuration.Observe(time.Since(now).Seconds()) + }() + + // Get users from disk, that way if cleanup fails after the tsdb has been closed we'll still finish cleanup + userIDs, err := ioutil.ReadDir(i.cfg.TSDBConfig.Dir) + if err != nil { + return err + } + + for _, info := range userIDs { + select { + case <-stopc: + return nil + default: + if i.shouldCloseTSDB(info.Name(), now) { + i.closeTSDB(info.Name()) + } + } + } + + return nil +} + +// shouldCloseTSDB performs the majority of checks against a tsdb without having to hold the lock. +func (i *Ingester) shouldCloseTSDB(user string, now time.Time) bool { + db := i.getTSDB(user) + if db == nil { + return true + } + defer db.Done() + + // DB not empty, skip it. + if db.Head().NumSeries() != 0 { + db.firstEmpty = time.Time{} // reset first empty to never + return false + } + + if db.firstEmpty.IsZero() { + db.firstEmpty = now + } + + // If the TSDB hasn't been empty for the required stale age, don't close it. + // TSDB has received a write within the window. + if now.Sub(db.firstEmpty) <= i.cfg.TSDBConfig.MaxStaleAge { + return false + } + + blocksDir := i.cfg.TSDBConfig.BlocksDir(user) + unshipped, err := unshippedUserBlocks(blocksDir) + if err != nil { + level.Warn(util.Logger).Log("msg", "failed to read unshipped blocks", "err", err, "user", user) + return false + } + + // Ensure there are no unshipped blocks + if len(unshipped) != 0 && i.cfg.TSDBConfig.ShipInterval > 0 { + return false + } + + return true +} + +// closeTSDB holds the userStatesMtx lock and shuts down a given TSDB. +func (i *Ingester) closeTSDB(user string) { + db := i.setTSDBState(user, closing) // mark the db as closing to prevent further accesses + if db != nil { + + // Wait for all accesses to the db to finish + db.Wait() + + // DB not empty, write must have happened in between shouldCloseTSDB and closeTSDB + if db.Head().NumSeries() != 0 { + db.firstEmpty = time.Time{} // reset first empty to never + i.setTSDBState(user, active) // mark tsdb as still active + return + } + + // Stop the shipper routine for this TSDB + if db.shipperCancel != nil { + db.shipperCancel() + db.shipperDone.Wait() + } + + db.DisableCompactions() + + // Close the TSDB + if err := db.Close(); err != nil { + // Restart shipper and compactor to roll-back the shutdown changes that have been made in the event this db becomes active again + db.EnableCompactions() + i.startShipper(user) + i.setTSDBState(user, active) // mark tsdb as still active + level.Warn(util.Logger).Log("msg", "unable to close TSDB", "err", err, "user", user) + return + } + + // DB is shutdown, remove it from the list + i.userStatesMtx.Lock() + delete(i.TSDBState.dbs, user) + i.userStatesMtx.Unlock() + } + + // Cleanup the on-disk contents + if err := os.RemoveAll(i.cfg.TSDBConfig.BlocksDir(user)); err != nil { + level.Warn(util.Logger).Log("msg", "failed to remove all user tsdb dir", "err", err, "user", user) + return + } +} + +// startShipper starts a shipper for a given user TSDB. It does not obtain the lock, callers need to ensure they hold the lock before calling. +func (i *Ingester) startShipper(userID string) { + if i.cfg.TSDBConfig.ShipInterval <= 0 { + return + } + + s := i.createShipper(userID, i.cfg.TSDBConfig.BlocksDir(userID)) + i.done.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer i.done.Done() + defer wg.Done() + err := runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error { + if uploaded, err := s.Sync(ctx); err != nil && err != context.Canceled { + level.Warn(util.Logger).Log("err", err, "uploaded", uploaded) + } + return nil + }) + if err != nil { + level.Warn(util.Logger).Log("err", err, "msg", "error while shipping") + } + }() + + i.TSDBState.dbs[userID].shipperDone = wg + i.TSDBState.dbs[userID].shipperCancel = cancel +} + +func (i *Ingester) createShipper(userID, dir string) *shipper.Shipper { + + // Thanos shipper requires at least 1 external label to be set. For this reason, + // we set the tenant ID as external label and we'll filter it out when reading + // the series from the storage. + l := labels.Labels{ + { + Name: cortex_tsdb.TenantIDExternalLabel, + Value: userID, + }, + } + + // Create a new shipper for this database + return shipper.New(util.Logger, nil, dir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource) +} + +func (i *Ingester) startCleanupLoop() { + i.done.Add(1) + go func() { + defer i.done.Done() + err := runutil.Repeat(time.Minute, i.quit, func() error { + if err := i.closeStaleTSDBs(i.quit); err != nil { + level.Warn(util.Logger).Log("err", err, "msg", "cleanup failed") + } + return nil + }) + if err != nil { + level.Warn(util.Logger).Log("err", err, "msg", "error during cleanup") + } + }() +} + +// setTSDBState sets the TSDB for the given user to the specified state and returns the db without increasing the WaitGroup +func (i *Ingester) setTSDBState(user string, state int) *UserTSDB { + i.userStatesMtx.Lock() + db, ok := i.TSDBState.dbs[user] + if ok { + db.state = state + } + i.userStatesMtx.Unlock() + + return db +} diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 2df6f4d0edd..1897a305d2a 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -4,15 +4,18 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "net/http" "os" "path/filepath" "strings" + "sync" "testing" "time" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/pkg/errors" @@ -23,6 +26,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "golang.org/x/net/context" @@ -823,3 +827,153 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { }) } } + +func TestCleanupTSDB(t *testing.T) { + dir, err := ioutil.TempDir("", "tsdb") + require.NoError(t, err) + defer os.RemoveAll(dir) + + clientCfg := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + ingesterCfg := defaultIngesterTestConfig() + ingesterCfg.TSDBEnabled = true + ingesterCfg.TSDBConfig.Dir = dir + ingesterCfg.TSDBConfig.Backend = "s3" + ingesterCfg.TSDBConfig.S3.Endpoint = "localhost" + ingesterCfg.TSDBConfig.MaxStaleAge = 10 * time.Minute + + ingester := &Ingester{ + cfg: ingesterCfg, + metrics: newIngesterMetrics(nil), + clientConfig: clientCfg, + limits: overrides, + chunkStore: nil, + quit: make(chan struct{}), + + TSDBState: TSDBState{ + dbs: make(map[string]*UserTSDB), + }, + } + ingester.lifecycler, err = ring.NewLifecycler(ingesterCfg.LifecyclerConfig, ingester, "ingester", ring.IngesterRingKey) + require.NoError(t, err) + + ingester.limiter = NewSeriesLimiter(overrides, ingester.lifecycler, ingesterCfg.LifecyclerConfig.RingConfig.ReplicationFactor, ingesterCfg.ShardByAllLabels) + ingester.userStates = newUserStates(ingester.limiter, ingesterCfg) + ingester.lifecycler.Start() + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return ingester.lifecycler.GetState() + }) + + tests := map[string]struct { + shippingOff bool + blocks int + unshipped int + recentlyWrite bool + removed bool + }{ + "no unshipped recently written": { + blocks: rand.Intn(10) + 1, + unshipped: 0, // no unshipped + recentlyWrite: true, // recently written + removed: false, + }, + "no blocks recently written": { + blocks: 0, + recentlyWrite: true, // recently written + removed: false, + }, + "unshipped not recently written": { + blocks: rand.Intn(10) + 1, + unshipped: 100, + recentlyWrite: false, + removed: true, + }, + "no blocks not recently written": { + blocks: 0, + recentlyWrite: false, + removed: true, + }, + "no unshipped blocks not recently written": { + blocks: rand.Intn(10) + 1, + unshipped: 0, + recentlyWrite: false, + removed: true, + }, + "shipping disabled": { + shippingOff: true, + blocks: rand.Intn(10) + 1, + unshipped: 100, + recentlyWrite: false, + removed: true, + }, + } + + for name, test := range tests { + + t.Run(name, func(t *testing.T) { + user := strings.Replace(name, " ", "_", -1) + + // Create a new user database. Do this instead og getOrCreateTSDB to avoid starting a shipper routine + db := &UserTSDB{} + udir := ingester.cfg.TSDBConfig.BlocksDir(user) + + var err error + db.DB, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{ + RetentionDuration: uint64(ingester.cfg.TSDBConfig.Retention / time.Millisecond), + BlockRanges: ingester.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(), + NoLockfile: true, + }) + require.NoError(t, err) + + ingester.userStatesMtx.Lock() + ingester.TSDBState.dbs[user] = db + ingester.userStatesMtx.Unlock() + + if test.shippingOff { + ingester.cfg.TSDBConfig.ShipInterval = 0 + db.shipperCancel = nil + db.shipperDone = nil + } else { + ingester.cfg.TSDBConfig.ShipInterval = 30 * time.Minute + _, cancel := context.WithCancel(context.Background()) + db.shipperCancel = cancel + db.shipperDone = &sync.WaitGroup{} + } + + // Create the on-disk tsdb + createTSDB(t, dir, []*userTSDB{ + { + userID: user, + shipPercent: 100 - test.unshipped, + numBlocks: test.blocks, + meta: &shipper.Meta{ + Version: shipper.MetaVersion1, + }, + }, + }) + + // Mark last written + if test.recentlyWrite { + db.firstEmpty = time.Now() + } else { + db.firstEmpty = time.Now().Add(-1 * time.Hour) + } + + stop := make(chan struct{}) + require.NoError(t, ingester.closeStaleTSDBs(stop)) + + // Check that the dir was deleted + _, err = ioutil.ReadDir(filepath.Join(dir, user)) + if test.removed { + require.Error(t, os.ErrNotExist, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8f0129f32da..5bd8ca60318 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -544,7 +544,7 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error { go func(db *tsdb.DB) { defer wg.Done() db.DisableCompactions() - }(db) + }(db.DB) } i.userStatesMtx.RUnlock() @@ -619,33 +619,48 @@ func unshippedBlocks(dir string) (map[string][]string, error) { blocks := make(map[string][]string, len(userIDs)) for _, user := range userIDs { - userID := user.Name() - blocks[userID] = []string{} // seed the map with the userID to ensure we xfer the WAL, even if all blocks are shipped - - blockIDs, err := ioutil.ReadDir(filepath.Join(dir, userID)) + blks, err := unshippedUserBlocks(filepath.Join(dir, user.Name())) if err != nil { - return nil, err + continue } - m, err := shipper.ReadMetaFile(filepath.Join(dir, userID)) - if err != nil { + blocks[user.Name()] = blks + } + + return blocks, nil +} + +// unshippedUserBlocks returns a ulid list of blocks that haven't been shipped for a given user +func unshippedUserBlocks(dir string) ([]string, error) { + blockIDs, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + m, err := shipper.ReadMetaFile(dir) + if err != nil { + // If the cleanup function runs before the shipper has had a chance to run there wont be a meta file written yet. + // So we can treat it as an "empty" meta file and no blocks have been shipped. + if !os.IsNotExist(err) { return nil, err } + m = &shipper.Meta{} + } - shipped := make(map[string]bool) - for _, u := range m.Uploaded { - shipped[u.String()] = true - } + shipped := make(map[string]bool) + for _, u := range m.Uploaded { + shipped[u.String()] = true + } - for _, blockID := range blockIDs { - _, err := ulid.Parse(blockID.Name()) - if err != nil { - continue - } + blocks := []string{} + for _, blockID := range blockIDs { + _, err := ulid.Parse(blockID.Name()) + if err != nil { + continue + } - if _, ok := shipped[blockID.Name()]; !ok { - blocks[userID] = append(blocks[userID], blockID.Name()) - } + if _, ok := shipped[blockID.Name()]; !ok { + blocks = append(blocks, blockID.Name()) } } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b887039a945..76ae50c6ceb 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -36,6 +36,7 @@ type Config struct { ShipInterval time.Duration `yaml:"ship_interval"` Backend string `yaml:"backend"` BucketStore BucketStoreConfig `yaml:"bucket_store"` + MaxStaleAge time.Duration `yaml:"max_stale_age"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -91,6 +92,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 30*time.Second, "the frequency at which tsdb blocks are scanned for shipping. 0 means shipping is disabled.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") + f.DurationVar(&cfg.MaxStaleAge, "experimental.tsdb.max-stale-age", time.Hour, "The max duration a TSDB remains open when nothing has been written to it.") } // Validate the config