From 7b4808715553657d38633a39026a7e4de14a5ba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jan 2020 13:29:26 +0100 Subject: [PATCH 01/21] Run local compactions on our own schedule and goroutines. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 80 ++++++++++++++++++++++++++++++++++++- pkg/storage/tsdb/config.go | 13 ++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 7f186f93c9b..1b7ea372e72 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -129,6 +129,11 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.done.Add(1) go i.updateLoop() + if i.cfg.TSDBConfig.CompactionConcurrency > 0 && i.cfg.TSDBConfig.CompactionInterval > 0 { + i.done.Add(1) + go i.compactionLoop() + } + return i, nil } @@ -592,6 +597,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if err != nil { return nil, err } + db.DisableCompactions() // we will compact on our own schedule userDB := &userTSDB{ DB: db, @@ -818,11 +824,83 @@ func (i *Ingester) shipBlocks() { }() } +sendLoop: for _, userID := range i.getTSDBUsers() { - workersChan <- userID + select { + case workersChan <- userID: + // ok + case <-i.quit: + // don't start new shippings + break sendLoop + } } close(workersChan) // Wait until all workers completed. wg.Wait() } + +func (i *Ingester) compactionLoop() { + defer i.done.Done() + + ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + i.compactBlocks() + + case <-i.quit: + return + } + } +} + +func (i *Ingester) compactBlocks() { + // Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers. + if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING { + level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState) + return + } + + wg := sync.WaitGroup{} + ch := make(chan string) + + for ix := 0; ix < i.cfg.TSDBConfig.CompactionConcurrency; ix++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range ch { + userDB := i.getTSDB(userID) + if userDB == nil { + continue + } + + err := userDB.Compact() + if err != nil { + level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) + } + } + }() + } + +sendLoop: + for _, userID := range i.getTSDBUsers() { + select { + case ch <- userID: + // ok + case <-i.quit: + // don't start new compactions. + break sendLoop + } + } + + close(ch) + + // wait for ongoing compactions to finish. + wg.Wait() +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2594729f692..021086a6670 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -44,6 +44,9 @@ type Config struct { Backend string `yaml:"backend"` BucketStore BucketStoreConfig `yaml:"bucket_store"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionConcurrency int `yaml:"compaction_concurrency"` + // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -108,6 +111,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") + f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") + f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") } // Validate the config @@ -120,6 +125,14 @@ func (cfg *Config) Validate() error { return errInvalidShipConcurrency } + if cfg.CompactionInterval <= 0 { + return errors.New("invalid TSDB compaction interval") + } + + if cfg.CompactionConcurrency <= 0 { + return errors.New("invalid TSDB compaction concurrency") + } + return nil } From e5bd9fc38edc9401abb4623179e32c6474ecc96b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 7 Feb 2020 10:20:58 +0100 Subject: [PATCH 02/21] Fix and add some tests to validation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/storage/tsdb/config.go | 14 ++++++++------ pkg/storage/tsdb/config_test.go | 23 +++++++++++++++++++++++ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 021086a6670..bf84cf733d5 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -30,8 +30,10 @@ const ( // Validation errors var ( - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") - errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") + errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") ) // Config holds the config information for TSDB storage @@ -125,12 +127,12 @@ func (cfg *Config) Validate() error { return errInvalidShipConcurrency } - if cfg.CompactionInterval <= 0 { - return errors.New("invalid TSDB compaction interval") + if cfg.CompactionInterval < 0 { + return errInvalidCompactionInterval } - if cfg.CompactionConcurrency <= 0 { - return errors.New("invalid TSDB compaction concurrency") + if cfg.CompactionInterval > 0 && cfg.CompactionConcurrency <= 0 { + return errInvalidCompactionConcurrency } return nil diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index c9ad2af9ac4..9598c8dcda7 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -49,6 +49,29 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: nil, }, + "should fail on invalid compaction interval": { + config: Config{ + Backend: "s3", + CompactionInterval: -1 * time.Minute, + }, + expectedErr: errInvalidCompactionInterval, + }, + "should fail on invalid compaction concurrency": { + config: Config{ + Backend: "s3", + CompactionInterval: time.Minute, + CompactionConcurrency: 0, + }, + expectedErr: errInvalidCompactionConcurrency, + }, + "should pass on on valid compaction config": { + config: Config{ + Backend: "s3", + CompactionInterval: time.Minute, + CompactionConcurrency: 10, + }, + expectedErr: nil, + }, } for testName, testData := range tests { From 178f59332883b3195d96e024449d269325501660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 7 Feb 2020 10:21:53 +0100 Subject: [PATCH 03/21] Refactor common code into runConcurrentUserWorkers method. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 96 +++++++++++++++---------------------- 1 file changed, 38 insertions(+), 58 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 1b7ea372e72..497baa8bab2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -791,53 +791,27 @@ func (i *Ingester) shipBlocks() { return } - // Create a pool of workers which will synchronize blocks. The pool size - // is limited in order to avoid to concurrently sync a lot of tenants in - // a large cluster. - workersChan := make(chan string) - wg := &sync.WaitGroup{} - wg.Add(i.cfg.TSDBConfig.ShipConcurrency) - - for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ { - go func() { - defer wg.Done() - - for userID := range workersChan { - // Get the user's DB. If the user doesn't exist, we skip it. - userDB := i.getTSDB(userID) - if userDB == nil || userDB.shipper == nil { - continue - } - - // Skip if the shipper context has been canceled. - if userDB.shipperCtx.Err() != nil { - continue - } - - // Run the shipper's Sync() to upload unshipped blocks. - if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { - level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) - } else { - level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) - } - } - }() - } + // Number of concurrent workers is limited in order to avoid to concurrently sync a lot + // of tenants in a large cluster. + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) { + // Get the user's DB. If the user doesn't exist, we skip it. + userDB := i.getTSDB(userID) + if userDB == nil || userDB.shipper == nil { + return + } -sendLoop: - for _, userID := range i.getTSDBUsers() { - select { - case workersChan <- userID: - // ok - case <-i.quit: - // don't start new shippings - break sendLoop + // Skip if the shipper context has been canceled. + if userDB.shipperCtx.Err() != nil { + return } - } - close(workersChan) - // Wait until all workers completed. - wg.Wait() + // Run the shipper's Sync() to upload unshipped blocks. + if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { + level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) + } + }) } func (i *Ingester) compactionLoop() { @@ -864,26 +838,32 @@ func (i *Ingester) compactBlocks() { return } + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { + userDB := i.getTSDB(userID) + if userDB == nil { + return + } + + err := userDB.Compact() + if err != nil { + level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) + } + }) +} + +func (i *Ingester) runConcurrentUserWorkers(concurrency int, userFunc func(userID string)) { wg := sync.WaitGroup{} ch := make(chan string) - for ix := 0; ix < i.cfg.TSDBConfig.CompactionConcurrency; ix++ { + for ix := 0; ix < concurrency; ix++ { wg.Add(1) go func() { defer wg.Done() for userID := range ch { - userDB := i.getTSDB(userID) - if userDB == nil { - continue - } - - err := userDB.Compact() - if err != nil { - level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) - } else { - level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) - } + userFunc(userID) } }() } @@ -894,13 +874,13 @@ sendLoop: case ch <- userID: // ok case <-i.quit: - // don't start new compactions. + // don't start new tasks. break sendLoop } } close(ch) - // wait for ongoing compactions to finish. + // wait for ongoing workers to finish. wg.Wait() } From 7e9e3827fbae1725fd068111bd1662cf07473033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 21 Feb 2020 14:35:48 +0100 Subject: [PATCH 04/21] Tied compaction and shipping together. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 89 ++++++++++---------------------- pkg/ingester/ingester_v2_test.go | 6 +-- pkg/storage/tsdb/config.go | 25 ++++----- pkg/storage/tsdb/config_test.go | 16 ------ 4 files changed, 39 insertions(+), 97 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 497baa8bab2..eb357cdbfd8 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -120,18 +120,12 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() - // Run the blocks shipping in a dedicated go routine. - if i.cfg.TSDBConfig.ShipInterval > 0 { - i.done.Add(1) - go i.shipBlocksLoop() - } - i.done.Add(1) go i.updateLoop() if i.cfg.TSDBConfig.CompactionConcurrency > 0 && i.cfg.TSDBConfig.CompactionInterval > 0 { i.done.Add(1) - go i.compactionLoop() + go i.compactionAndShippingLoop() } return i, nil @@ -617,7 +611,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } // Create a new shipper for this database - if i.cfg.TSDBConfig.ShipInterval > 0 { + if i.cfg.TSDBConfig.ShippingEnabled { userDB.shipper = shipper.New( util.Logger, tsdbPromReg, @@ -748,10 +742,10 @@ func (i *Ingester) numSeriesInTSDB() float64 { return float64(count) } -func (i *Ingester) shipBlocksLoop() { - // It's important to add the shipper loop to the "done" wait group, +func (i *Ingester) compactionAndShippingLoop() { + // It's important to add this loop to the "done" wait group, // because the blocks transfer should start only once it's guaranteed - // there's no shipping on-going. + // there's no compaction or shipping on-going. defer i.done.Done() // Start a goroutine that will cancel all shipper contexts on ingester @@ -767,13 +761,13 @@ func (i *Ingester) shipBlocksLoop() { } }() - shipTicker := time.NewTicker(i.cfg.TSDBConfig.ShipInterval) - defer shipTicker.Stop() + ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) + defer ticker.Stop() for { select { - case <-shipTicker.C: - i.shipBlocks() + case <-ticker.C: + i.compactAndShipBlocks() case <-i.quit: return @@ -781,74 +775,45 @@ func (i *Ingester) shipBlocksLoop() { } } -func (i *Ingester) shipBlocks() { - // Do not ship blocks if the ingester is PENDING or JOINING. It's +func (i *Ingester) compactAndShipBlocks() { + // Do not compact or ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could // be a blocks transfer in progress (from another ingester) and if we // run the shipper in such state we could end up with race conditions. if ingesterState := i.lifecycler.GetState(); ingesterState == ring.PENDING || ingesterState == ring.JOINING { - level.Info(util.Logger).Log("msg", "TSDB blocks shipping has been skipped because of the current ingester state", "state", ingesterState) + level.Info(util.Logger).Log("msg", "TSDB blocks compaction and shipping has been skipped because of the current ingester state", "state", ingesterState) return } - // Number of concurrent workers is limited in order to avoid to concurrently sync a lot - // of tenants in a large cluster. - i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) { - // Get the user's DB. If the user doesn't exist, we skip it. + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { userDB := i.getTSDB(userID) - if userDB == nil || userDB.shipper == nil { - return - } - - // Skip if the shipper context has been canceled. - if userDB.shipperCtx.Err() != nil { + if userDB == nil { return } - // Run the shipper's Sync() to upload unshipped blocks. - if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { - level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) + err := userDB.Compact() + if err != nil { + level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + // Should we still ship in such case? } else { - level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) + level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) } - }) -} -func (i *Ingester) compactionLoop() { - defer i.done.Done() - - ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - i.compactBlocks() - - case <-i.quit: + if userDB.shipper == nil { + // shipping disabled return } - } -} - -func (i *Ingester) compactBlocks() { - // Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers. - if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING { - level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState) - return - } - i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { - userDB := i.getTSDB(userID) - if userDB == nil { + // Skip if the shipper context has been canceled. + if userDB.shipperCtx.Err() != nil { return } - err := userDB.Compact() - if err != nil { - level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + // Run the shipper's Sync() to upload unshipped blocks. + if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { + level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) } else { - level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) } }) } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index feabef64030..76b56f0c55a 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1016,10 +1016,10 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { } } -func TestIngester_shipBlocks(t *testing.T) { +func TestIngester_compactAndShipBlocks(t *testing.T) { cfg := defaultIngesterTestConfig() cfg.LifecyclerConfig.JoinAfter = 0 - cfg.TSDBConfig.ShipConcurrency = 2 + cfg.TSDBConfig.CompactionConcurrency = 2 // Create ingester i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) @@ -1047,7 +1047,7 @@ func TestIngester_shipBlocks(t *testing.T) { } // Ship blocks and assert on the mocked shipper - i.shipBlocks() + i.compactAndShipBlocks() for _, m := range mocks { m.AssertNumberOfCalls(t, "Sync", 1) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index bf84cf733d5..95bf8a39aa8 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -31,23 +31,21 @@ const ( // Validation errors var ( errUnsupportedBackend = errors.New("unsupported TSDB storage backend") - errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") ) // Config holds the config information for TSDB storage type Config struct { - Dir string `yaml:"dir"` - BlockRanges DurationList `yaml:"block_ranges_period"` - Retention time.Duration `yaml:"retention_period"` - ShipInterval time.Duration `yaml:"ship_interval"` - ShipConcurrency int `yaml:"ship_concurrency"` - Backend string `yaml:"backend"` - BucketStore BucketStoreConfig `yaml:"bucket_store"` + Dir string `yaml:"dir"` + BlockRanges DurationList `yaml:"block_ranges_period"` + Retention time.Duration `yaml:"retention_period"` + Backend string `yaml:"backend"` + BucketStore BucketStoreConfig `yaml:"bucket_store"` CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionConcurrency int `yaml:"compaction_concurrency"` + ShippingEnabled bool `yaml:"shipping_enabled"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -109,12 +107,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Dir, "experimental.tsdb.dir", "tsdb", "directory to place all TSDB's into") f.Var(&cfg.BlockRanges, "experimental.tsdb.block-ranges-period", "comma separated list of TSDB block ranges in time.Duration format") f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention") - f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 1*time.Minute, "How frequently the TSDB blocks are scanned and new ones are shipped to the storage. 0 means shipping is disabled.") - f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") - f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") + f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted (and shipped)") + f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting and shipping locally-stored TSDB blocks") + f.BoolVar(&cfg.ShippingEnabled, "experimental.tsdb.shipping-enabled", true, "Ship TSDB blocks to storage after compaction") } // Validate the config @@ -123,10 +120,6 @@ func (cfg *Config) Validate() error { return errUnsupportedBackend } - if cfg.ShipInterval > 0 && cfg.ShipConcurrency <= 0 { - return errInvalidShipConcurrency - } - if cfg.CompactionInterval < 0 { return errInvalidCompactionInterval } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 9598c8dcda7..ed7ee30e943 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -33,22 +33,6 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errUnsupportedBackend, }, - "should fail on invalid ship concurrency": { - config: Config{ - Backend: "s3", - ShipInterval: time.Minute, - ShipConcurrency: 0, - }, - expectedErr: errInvalidShipConcurrency, - }, - "should pass on invalid ship concurrency but shipping is disabled": { - config: Config{ - Backend: "s3", - ShipInterval: 0, - ShipConcurrency: 0, - }, - expectedErr: nil, - }, "should fail on invalid compaction interval": { config: Config{ Backend: "s3", From 6440781ccf94ee6e1d244721cb87d40c8f500ac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 21 Feb 2020 14:38:11 +0100 Subject: [PATCH 05/21] Updated CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 883e675cf9b..cd520537b09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 +* [CHANGE] Experimental TSDB: TSDB head compaction interval is now configurable (defaults to 2h). Shipping now happens right after compaction, and shipping interval and concurrency options were removed. Shipping can still be disabled. * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` From 1454ae1395605c1ebe9a76624d1e162ab2e08633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 21 Feb 2020 14:41:16 +0100 Subject: [PATCH 06/21] Updated CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd520537b09..84807c10cee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 -* [CHANGE] Experimental TSDB: TSDB head compaction interval is now configurable (defaults to 2h). Shipping now happens right after compaction, and shipping interval and concurrency options were removed. Shipping can still be disabled. +* [CHANGE] Experimental TSDB: TSDB head compaction interval is now configurable (defaults to 2h). Shipping now happens right after compaction, and shipping interval and concurrency options were removed. Shipping can still be disabled. #2172 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` From d422afe705ec403d77d827f556b0fb2194aae891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 21 Feb 2020 14:44:40 +0100 Subject: [PATCH 07/21] Updated documentation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 592366ef7cc..3be94765b4a 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -114,14 +114,17 @@ tsdb: # CLI flag: -experimental.tsdb.retention-period [ retention_period: | default = 6h] - # How frequently the TSDB blocks are scanned and new ones are shipped to - # the storage. 0 means shipping is disabled. - # CLI flag: -experimental.tsdb.ship-interval - [ship_interval: | default = 1m] - - # Maximum number of tenants concurrently shipping blocks to the storage. - # CLI flag: -experimental.tsdb.ship-concurrency - [ship_concurrency: | default = 10] + # How frequently are TSDB head compacted, and subsequently shipped to the storage. + # CLI flag: -experimental.tsdb.compaction-interval + [compaction_interval: | default = 2h] + + # Maximum number of tenants concurrently compacting and shipping blocks to the storage. + # CLI flag: -experimental.tsdb.compaction-concurrency + [compaction_concurrency: | default = 5] + + # Is shipping of blocks to the storage enabled? + # CLI flag: -experimental.tsdb.shipping-enabled + [shipping_enabled: | default true] # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. From 4fe63a1d6a5920a9c4b807d8df88e87d32083673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 21 Feb 2020 14:50:45 +0100 Subject: [PATCH 08/21] Merge blocks. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/storage/tsdb/config.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 95bf8a39aa8..b4546118c97 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -37,15 +37,14 @@ var ( // Config holds the config information for TSDB storage type Config struct { - Dir string `yaml:"dir"` - BlockRanges DurationList `yaml:"block_ranges_period"` - Retention time.Duration `yaml:"retention_period"` - Backend string `yaml:"backend"` - BucketStore BucketStoreConfig `yaml:"bucket_store"` - - CompactionInterval time.Duration `yaml:"compaction_interval"` - CompactionConcurrency int `yaml:"compaction_concurrency"` - ShippingEnabled bool `yaml:"shipping_enabled"` + Dir string `yaml:"dir"` + BlockRanges DurationList `yaml:"block_ranges_period"` + Retention time.Duration `yaml:"retention_period"` + Backend string `yaml:"backend"` + BucketStore BucketStoreConfig `yaml:"bucket_store"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionConcurrency int `yaml:"compaction_concurrency"` + ShippingEnabled bool `yaml:"shipping_enabled"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` From 449f4194780a226925d6926ae801cf5fda4c6d4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 09:23:51 +0100 Subject: [PATCH 09/21] Revert "Tied compaction and shipping together." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit ac372e9f065f00e909f67d7c3b3557e2974f311c. Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 89 ++++++++++++++++++++++---------- pkg/ingester/ingester_v2_test.go | 6 +-- pkg/storage/tsdb/config.go | 15 ++++-- pkg/storage/tsdb/config_test.go | 16 ++++++ 4 files changed, 92 insertions(+), 34 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index eb357cdbfd8..497baa8bab2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -120,12 +120,18 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() + // Run the blocks shipping in a dedicated go routine. + if i.cfg.TSDBConfig.ShipInterval > 0 { + i.done.Add(1) + go i.shipBlocksLoop() + } + i.done.Add(1) go i.updateLoop() if i.cfg.TSDBConfig.CompactionConcurrency > 0 && i.cfg.TSDBConfig.CompactionInterval > 0 { i.done.Add(1) - go i.compactionAndShippingLoop() + go i.compactionLoop() } return i, nil @@ -611,7 +617,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } // Create a new shipper for this database - if i.cfg.TSDBConfig.ShippingEnabled { + if i.cfg.TSDBConfig.ShipInterval > 0 { userDB.shipper = shipper.New( util.Logger, tsdbPromReg, @@ -742,10 +748,10 @@ func (i *Ingester) numSeriesInTSDB() float64 { return float64(count) } -func (i *Ingester) compactionAndShippingLoop() { - // It's important to add this loop to the "done" wait group, +func (i *Ingester) shipBlocksLoop() { + // It's important to add the shipper loop to the "done" wait group, // because the blocks transfer should start only once it's guaranteed - // there's no compaction or shipping on-going. + // there's no shipping on-going. defer i.done.Done() // Start a goroutine that will cancel all shipper contexts on ingester @@ -761,13 +767,13 @@ func (i *Ingester) compactionAndShippingLoop() { } }() - ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) - defer ticker.Stop() + shipTicker := time.NewTicker(i.cfg.TSDBConfig.ShipInterval) + defer shipTicker.Stop() for { select { - case <-ticker.C: - i.compactAndShipBlocks() + case <-shipTicker.C: + i.shipBlocks() case <-i.quit: return @@ -775,32 +781,22 @@ func (i *Ingester) compactionAndShippingLoop() { } } -func (i *Ingester) compactAndShipBlocks() { - // Do not compact or ship blocks if the ingester is PENDING or JOINING. It's +func (i *Ingester) shipBlocks() { + // Do not ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could // be a blocks transfer in progress (from another ingester) and if we // run the shipper in such state we could end up with race conditions. if ingesterState := i.lifecycler.GetState(); ingesterState == ring.PENDING || ingesterState == ring.JOINING { - level.Info(util.Logger).Log("msg", "TSDB blocks compaction and shipping has been skipped because of the current ingester state", "state", ingesterState) + level.Info(util.Logger).Log("msg", "TSDB blocks shipping has been skipped because of the current ingester state", "state", ingesterState) return } - i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { + // Number of concurrent workers is limited in order to avoid to concurrently sync a lot + // of tenants in a large cluster. + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) { + // Get the user's DB. If the user doesn't exist, we skip it. userDB := i.getTSDB(userID) - if userDB == nil { - return - } - - err := userDB.Compact() - if err != nil { - level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) - // Should we still ship in such case? - } else { - level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) - } - - if userDB.shipper == nil { - // shipping disabled + if userDB == nil || userDB.shipper == nil { return } @@ -818,6 +814,45 @@ func (i *Ingester) compactAndShipBlocks() { }) } +func (i *Ingester) compactionLoop() { + defer i.done.Done() + + ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + i.compactBlocks() + + case <-i.quit: + return + } + } +} + +func (i *Ingester) compactBlocks() { + // Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers. + if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING { + level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState) + return + } + + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { + userDB := i.getTSDB(userID) + if userDB == nil { + return + } + + err := userDB.Compact() + if err != nil { + level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) + } + }) +} + func (i *Ingester) runConcurrentUserWorkers(concurrency int, userFunc func(userID string)) { wg := sync.WaitGroup{} ch := make(chan string) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 76b56f0c55a..feabef64030 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1016,10 +1016,10 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { } } -func TestIngester_compactAndShipBlocks(t *testing.T) { +func TestIngester_shipBlocks(t *testing.T) { cfg := defaultIngesterTestConfig() cfg.LifecyclerConfig.JoinAfter = 0 - cfg.TSDBConfig.CompactionConcurrency = 2 + cfg.TSDBConfig.ShipConcurrency = 2 // Create ingester i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) @@ -1047,7 +1047,7 @@ func TestIngester_compactAndShipBlocks(t *testing.T) { } // Ship blocks and assert on the mocked shipper - i.compactAndShipBlocks() + i.shipBlocks() for _, m := range mocks { m.AssertNumberOfCalls(t, "Sync", 1) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b4546118c97..cff6a0bae54 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -31,6 +31,7 @@ const ( // Validation errors var ( errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") ) @@ -40,11 +41,12 @@ type Config struct { Dir string `yaml:"dir"` BlockRanges DurationList `yaml:"block_ranges_period"` Retention time.Duration `yaml:"retention_period"` + ShipInterval time.Duration `yaml:"ship_interval"` + ShipConcurrency int `yaml:"ship_concurrency"` Backend string `yaml:"backend"` BucketStore BucketStoreConfig `yaml:"bucket_store"` CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionConcurrency int `yaml:"compaction_concurrency"` - ShippingEnabled bool `yaml:"shipping_enabled"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -106,11 +108,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Dir, "experimental.tsdb.dir", "tsdb", "directory to place all TSDB's into") f.Var(&cfg.BlockRanges, "experimental.tsdb.block-ranges-period", "comma separated list of TSDB block ranges in time.Duration format") f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention") + f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 1*time.Minute, "How frequently the TSDB blocks are scanned and new ones are shipped to the storage. 0 means shipping is disabled.") + f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted (and shipped)") - f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting and shipping locally-stored TSDB blocks") - f.BoolVar(&cfg.ShippingEnabled, "experimental.tsdb.shipping-enabled", true, "Ship TSDB blocks to storage after compaction") + f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") + f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") } // Validate the config @@ -119,6 +122,10 @@ func (cfg *Config) Validate() error { return errUnsupportedBackend } + if cfg.ShipInterval > 0 && cfg.ShipConcurrency <= 0 { + return errInvalidShipConcurrency + } + if cfg.CompactionInterval < 0 { return errInvalidCompactionInterval } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index ed7ee30e943..9598c8dcda7 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -33,6 +33,22 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errUnsupportedBackend, }, + "should fail on invalid ship concurrency": { + config: Config{ + Backend: "s3", + ShipInterval: time.Minute, + ShipConcurrency: 0, + }, + expectedErr: errInvalidShipConcurrency, + }, + "should pass on invalid ship concurrency but shipping is disabled": { + config: Config{ + Backend: "s3", + ShipInterval: 0, + ShipConcurrency: 0, + }, + expectedErr: nil, + }, "should fail on invalid compaction interval": { config: Config{ Backend: "s3", From 0f60c0110765af22a5b63cf10f59d8155200df12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 09:24:21 +0100 Subject: [PATCH 10/21] Revert "Updated documentation." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 9adb4cb9b4273ac189a3bb0478090966a0db67ec. Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 3be94765b4a..592366ef7cc 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -114,17 +114,14 @@ tsdb: # CLI flag: -experimental.tsdb.retention-period [ retention_period: | default = 6h] - # How frequently are TSDB head compacted, and subsequently shipped to the storage. - # CLI flag: -experimental.tsdb.compaction-interval - [compaction_interval: | default = 2h] - - # Maximum number of tenants concurrently compacting and shipping blocks to the storage. - # CLI flag: -experimental.tsdb.compaction-concurrency - [compaction_concurrency: | default = 5] - - # Is shipping of blocks to the storage enabled? - # CLI flag: -experimental.tsdb.shipping-enabled - [shipping_enabled: | default true] + # How frequently the TSDB blocks are scanned and new ones are shipped to + # the storage. 0 means shipping is disabled. + # CLI flag: -experimental.tsdb.ship-interval + [ship_interval: | default = 1m] + + # Maximum number of tenants concurrently shipping blocks to the storage. + # CLI flag: -experimental.tsdb.ship-concurrency + [ship_concurrency: | default = 10] # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. From 6fee5b974d031738619ed8f396da033f56e545e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 09:27:05 +0100 Subject: [PATCH 11/21] Fixed documentation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 592366ef7cc..c1835ec4146 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -123,6 +123,14 @@ tsdb: # CLI flag: -experimental.tsdb.ship-concurrency [ship_concurrency: | default = 10] + # How frequently are TSDB heads compacted. + # CLI flag: -experimental.tsdb.compaction-interval + [compaction_interval: | default = 2h] + + # Maximum number of tenants concurrently compacting TSDB head. + # CLI flag: -experimental.tsdb.compaction-concurrency + [compaction_concurrency: | default = 5] + # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. bucket_store: From 341a65324f222aac4e73614c993092f57e0dd551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 09:28:22 +0100 Subject: [PATCH 12/21] Renamed TSDB head compaction options to use "head" prefix. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To avoid confusion with compactor, which also has "compaction_interval" config field. Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 8 ++++---- pkg/ingester/ingester_v2.go | 6 +++--- pkg/storage/tsdb/config.go | 26 +++++++++++++------------- pkg/storage/tsdb/config_test.go | 16 ++++++++-------- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index c1835ec4146..0ad52fbdfee 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -124,12 +124,12 @@ tsdb: [ship_concurrency: | default = 10] # How frequently are TSDB heads compacted. - # CLI flag: -experimental.tsdb.compaction-interval - [compaction_interval: | default = 2h] + # CLI flag: -experimental.tsdb.head-compaction-interval + [head_compaction_interval: | default = 2h] # Maximum number of tenants concurrently compacting TSDB head. - # CLI flag: -experimental.tsdb.compaction-concurrency - [compaction_concurrency: | default = 5] + # CLI flag: -experimental.tsdb.head-compaction-concurrency + [head_compaction_concurrency: | default = 5] # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 497baa8bab2..c5f5c2952f7 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -129,7 +129,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.done.Add(1) go i.updateLoop() - if i.cfg.TSDBConfig.CompactionConcurrency > 0 && i.cfg.TSDBConfig.CompactionInterval > 0 { + if i.cfg.TSDBConfig.HeadCompactionConcurrency > 0 && i.cfg.TSDBConfig.HeadCompactionInterval > 0 { i.done.Add(1) go i.compactionLoop() } @@ -817,7 +817,7 @@ func (i *Ingester) shipBlocks() { func (i *Ingester) compactionLoop() { defer i.done.Done() - ticker := time.NewTicker(i.cfg.TSDBConfig.CompactionInterval) + ticker := time.NewTicker(i.cfg.TSDBConfig.HeadCompactionInterval) defer ticker.Stop() for { @@ -838,7 +838,7 @@ func (i *Ingester) compactBlocks() { return } - i.runConcurrentUserWorkers(i.cfg.TSDBConfig.CompactionConcurrency, func(userID string) { + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.HeadCompactionConcurrency, func(userID string) { userDB := i.getTSDB(userID) if userDB == nil { return diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index cff6a0bae54..22c6dbc03e6 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -38,15 +38,15 @@ var ( // Config holds the config information for TSDB storage type Config struct { - Dir string `yaml:"dir"` - BlockRanges DurationList `yaml:"block_ranges_period"` - Retention time.Duration `yaml:"retention_period"` - ShipInterval time.Duration `yaml:"ship_interval"` - ShipConcurrency int `yaml:"ship_concurrency"` - Backend string `yaml:"backend"` - BucketStore BucketStoreConfig `yaml:"bucket_store"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - CompactionConcurrency int `yaml:"compaction_concurrency"` + Dir string `yaml:"dir"` + BlockRanges DurationList `yaml:"block_ranges_period"` + Retention time.Duration `yaml:"retention_period"` + ShipInterval time.Duration `yaml:"ship_interval"` + ShipConcurrency int `yaml:"ship_concurrency"` + Backend string `yaml:"backend"` + BucketStore BucketStoreConfig `yaml:"bucket_store"` + HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"` + HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -112,8 +112,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.CompactionInterval, "experimental.tsdb.compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") - f.IntVar(&cfg.CompactionConcurrency, "experimental.tsdb.compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") + f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") + f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") } // Validate the config @@ -126,11 +126,11 @@ func (cfg *Config) Validate() error { return errInvalidShipConcurrency } - if cfg.CompactionInterval < 0 { + if cfg.HeadCompactionInterval < 0 { return errInvalidCompactionInterval } - if cfg.CompactionInterval > 0 && cfg.CompactionConcurrency <= 0 { + if cfg.HeadCompactionInterval > 0 && cfg.HeadCompactionConcurrency <= 0 { return errInvalidCompactionConcurrency } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 9598c8dcda7..e76f4566364 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -51,24 +51,24 @@ func TestConfig_Validate(t *testing.T) { }, "should fail on invalid compaction interval": { config: Config{ - Backend: "s3", - CompactionInterval: -1 * time.Minute, + Backend: "s3", + HeadCompactionInterval: -1 * time.Minute, }, expectedErr: errInvalidCompactionInterval, }, "should fail on invalid compaction concurrency": { config: Config{ - Backend: "s3", - CompactionInterval: time.Minute, - CompactionConcurrency: 0, + Backend: "s3", + HeadCompactionInterval: time.Minute, + HeadCompactionConcurrency: 0, }, expectedErr: errInvalidCompactionConcurrency, }, "should pass on on valid compaction config": { config: Config{ - Backend: "s3", - CompactionInterval: time.Minute, - CompactionConcurrency: 10, + Backend: "s3", + HeadCompactionInterval: time.Minute, + HeadCompactionConcurrency: 10, }, expectedErr: nil, }, From 16ad22cc511b046408611dd44c1d7c4703419459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 10:19:55 +0100 Subject: [PATCH 13/21] Expose compaction metrics. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/metrics.go | 23 +++++++++++++++++++---- pkg/ingester/metrics_test.go | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index d4d64daa272..79f5dd238eb 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -118,10 +118,12 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT type tsdbMetrics struct { // We aggregate metrics from individual TSDB registries into // a single set of counters, which are exposed as Cortex metrics. - dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total) - dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total) - uploads *prometheus.Desc // sum(thanos_shipper_uploads_total) - uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) + dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total) + dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total) + uploads *prometheus.Desc // sum(thanos_shipper_uploads_total) + uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) + compactionsTriggered *prometheus.Desc // sum(prometheus_tsdb_compactions_triggered_total) + compactionsFailed *prometheus.Desc // sum(prometheus_tsdb_compactions_failed_total) // These two metrics replace metrics in ingesterMetrics, as we count them differently memSeriesCreatedTotal *prometheus.Desc @@ -151,6 +153,14 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { "cortex_ingester_shipper_upload_failures_total", "TSDB: Total number of failed object uploads", nil, nil), + compactionsTriggered: prometheus.NewDesc( + "cortex_ingester_tsdb_compactions_triggered_total", + "Total number of triggered compactions for the partition.", + nil, nil), + compactionsFailed: prometheus.NewDesc( + "cortex_ingester_tsdb_compactions_failed_total", + "Total number of compactions that failed for the partition.", + nil, nil), memSeriesCreatedTotal: prometheus.NewDesc(memSeriesCreatedTotalName, memSeriesCreatedTotalHelp, []string{"user"}, nil), memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil), @@ -169,6 +179,8 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.uploadFailures out <- sm.memSeriesCreatedTotal out <- sm.memSeriesRemovedTotal + out <- sm.compactionsTriggered + out <- sm.compactionsFailed } func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { @@ -182,6 +194,9 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total") data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") + + data.SendSumOfCounters(out, sm.compactionsTriggered, "prometheus_tsdb_compactions_triggered_total") + data.SendSumOfCounters(out, sm.compactionsFailed, "prometheus_tsdb_compactions_failed_total") } // make a copy of the map, so that metrics can be gathered while the new registry is being added. diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 749cbe4ca48..ee59fed003a 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -52,6 +52,14 @@ func TestTSDBMetrics(t *testing.T) { cortex_ingester_memory_series_removed_total{user="user1"} 74070 cortex_ingester_memory_series_removed_total{user="user2"} 514722 cortex_ingester_memory_series_removed_total{user="user3"} 5994 + + # HELP cortex_ingester_tsdb_compactions_triggered_total Total number of triggered compactions for the partition. + # TYPE cortex_ingester_tsdb_compactions_triggered_total counter + cortex_ingester_tsdb_compactions_triggered_total 693917 + + # HELP cortex_ingester_tsdb_compactions_failed_total Total number of compactions that failed for the partition. + # TYPE cortex_ingester_tsdb_compactions_failed_total counter + cortex_ingester_tsdb_compactions_failed_total 793048 `)) require.NoError(t, err) } @@ -95,12 +103,23 @@ func populateTSDBMetrics(base float64) *prometheus.Registry { }) seriesRemoved.Add(6 * base) + compactionsTriggered := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_triggered_total", + }) + compactionsTriggered.Add(7 * base) + + compactionsFailed := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_compactions_failed_total", + }) + compactionsFailed.Add(8 * base) + r.MustRegister(dirSyncs) r.MustRegister(dirSyncFailures) r.MustRegister(uploads) r.MustRegister(uploadFailures) r.MustRegister(seriesCreated) r.MustRegister(seriesRemoved) + r.MustRegister(compactionsTriggered, compactionsFailed) return r } From 853346be6e101cc3f4d5b139e5a0d8f482d7b880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 10:59:50 +0100 Subject: [PATCH 14/21] Moved compaction metrics to TSDB state. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit They are now updated in Cortex code, when we call compaction. Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 18 ++++++++++++++++++ pkg/ingester/metrics.go | 25 +++++-------------------- pkg/ingester/metrics_test.go | 19 ------------------- 3 files changed, 23 insertions(+), 39 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index c5f5c2952f7..ed7b4ddc2d7 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -65,6 +65,10 @@ type TSDBState struct { transferOnce sync.Once tsdbMetrics *tsdbMetrics + + // Head compactions metrics. + compactionsTriggered prometheus.Counter + compactionsFailed prometheus.Counter } // NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage @@ -90,6 +94,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, dbs: make(map[string]*userTSDB), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer), + + compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_tsdb_compactions_triggered_total", + Help: "Total number of triggered compactions.", + }), + + compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_tsdb_compactions_failed_total", + Help: "Total number of compactions that failed.", + }), }, } @@ -101,6 +115,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, Name: "cortex_ingester_memory_series", Help: "The current number of series in memory.", }, i.numSeriesInTSDB)) + registerer.MustRegister(i.TSDBState.compactionsTriggered) + registerer.MustRegister(i.TSDBState.compactionsFailed) } i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) @@ -844,8 +860,10 @@ func (i *Ingester) compactBlocks() { return } + i.TSDBState.compactionsTriggered.Inc() err := userDB.Compact() if err != nil { + i.TSDBState.compactionsFailed.Inc() level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) } else { level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 79f5dd238eb..27c9ee0d3ab 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -114,16 +114,14 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT return m } -// TSDB metrics. Each tenant has its own registry, that TSDB code uses. +// TSDB metrics collector. Each tenant has its own registry, that TSDB code uses. type tsdbMetrics struct { // We aggregate metrics from individual TSDB registries into // a single set of counters, which are exposed as Cortex metrics. - dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total) - dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total) - uploads *prometheus.Desc // sum(thanos_shipper_uploads_total) - uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) - compactionsTriggered *prometheus.Desc // sum(prometheus_tsdb_compactions_triggered_total) - compactionsFailed *prometheus.Desc // sum(prometheus_tsdb_compactions_failed_total) + dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total) + dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total) + uploads *prometheus.Desc // sum(thanos_shipper_uploads_total) + uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) // These two metrics replace metrics in ingesterMetrics, as we count them differently memSeriesCreatedTotal *prometheus.Desc @@ -153,14 +151,6 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { "cortex_ingester_shipper_upload_failures_total", "TSDB: Total number of failed object uploads", nil, nil), - compactionsTriggered: prometheus.NewDesc( - "cortex_ingester_tsdb_compactions_triggered_total", - "Total number of triggered compactions for the partition.", - nil, nil), - compactionsFailed: prometheus.NewDesc( - "cortex_ingester_tsdb_compactions_failed_total", - "Total number of compactions that failed for the partition.", - nil, nil), memSeriesCreatedTotal: prometheus.NewDesc(memSeriesCreatedTotalName, memSeriesCreatedTotalHelp, []string{"user"}, nil), memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil), @@ -179,8 +169,6 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.uploadFailures out <- sm.memSeriesCreatedTotal out <- sm.memSeriesRemovedTotal - out <- sm.compactionsTriggered - out <- sm.compactionsFailed } func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { @@ -194,9 +182,6 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total") data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") - - data.SendSumOfCounters(out, sm.compactionsTriggered, "prometheus_tsdb_compactions_triggered_total") - data.SendSumOfCounters(out, sm.compactionsFailed, "prometheus_tsdb_compactions_failed_total") } // make a copy of the map, so that metrics can be gathered while the new registry is being added. diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index ee59fed003a..749cbe4ca48 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -52,14 +52,6 @@ func TestTSDBMetrics(t *testing.T) { cortex_ingester_memory_series_removed_total{user="user1"} 74070 cortex_ingester_memory_series_removed_total{user="user2"} 514722 cortex_ingester_memory_series_removed_total{user="user3"} 5994 - - # HELP cortex_ingester_tsdb_compactions_triggered_total Total number of triggered compactions for the partition. - # TYPE cortex_ingester_tsdb_compactions_triggered_total counter - cortex_ingester_tsdb_compactions_triggered_total 693917 - - # HELP cortex_ingester_tsdb_compactions_failed_total Total number of compactions that failed for the partition. - # TYPE cortex_ingester_tsdb_compactions_failed_total counter - cortex_ingester_tsdb_compactions_failed_total 793048 `)) require.NoError(t, err) } @@ -103,23 +95,12 @@ func populateTSDBMetrics(base float64) *prometheus.Registry { }) seriesRemoved.Add(6 * base) - compactionsTriggered := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_triggered_total", - }) - compactionsTriggered.Add(7 * base) - - compactionsFailed := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_compactions_failed_total", - }) - compactionsFailed.Add(8 * base) - r.MustRegister(dirSyncs) r.MustRegister(dirSyncFailures) r.MustRegister(uploads) r.MustRegister(uploadFailures) r.MustRegister(seriesCreated) r.MustRegister(seriesRemoved) - r.MustRegister(compactionsTriggered, compactionsFailed) return r } From 286265c0c8073c128862683b20c385e7f577ca28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 11:00:24 +0100 Subject: [PATCH 15/21] Decrease compaction interval. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/storage/tsdb/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 22c6dbc03e6..6f661469d38 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -112,7 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 2*time.Hour, "How frequently should locally-stored TSDB blocks be compacted") + f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 5*time.Minute, "How frequently should locally-stored TSDB blocks be compacted") f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") } From 4535ed38094d5041d6507c9d3cc18d708111f263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 11:09:58 +0100 Subject: [PATCH 16/21] Updated help and changelog entry. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- pkg/storage/tsdb/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84807c10cee..01e54f88902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 -* [CHANGE] Experimental TSDB: TSDB head compaction interval is now configurable (defaults to 2h). Shipping now happens right after compaction, and shipping interval and concurrency options were removed. Shipping can still be disabled. #2172 +* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 5m interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 6f661469d38..343bcbe3114 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -112,7 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 5*time.Minute, "How frequently should locally-stored TSDB blocks be compacted") + f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 5*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range.") f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") } From 87cbc7376cbf02f2009ffbd67f2a505cf248cd06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 09:24:21 +0100 Subject: [PATCH 17/21] Documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 0ad52fbdfee..1fbe122caed 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -123,7 +123,7 @@ tsdb: # CLI flag: -experimental.tsdb.ship-concurrency [ship_concurrency: | default = 10] - # How frequently are TSDB heads compacted. + # How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. # CLI flag: -experimental.tsdb.head-compaction-interval [head_compaction_interval: | default = 2h] From 8fee89bdc976115d465d321558c5802e9e0b5ee5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 13:14:08 +0100 Subject: [PATCH 18/21] Fix default value. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 1fbe122caed..2dbbf2cbf75 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -125,7 +125,7 @@ tsdb: # How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. # CLI flag: -experimental.tsdb.head-compaction-interval - [head_compaction_interval: | default = 2h] + [head_compaction_interval: | default = 5m] # Maximum number of tenants concurrently compacting TSDB head. # CLI flag: -experimental.tsdb.head-compaction-concurrency From cd72ae92dcf1b82d74a56af167f07774532848c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 15:09:11 +0100 Subject: [PATCH 19/21] Review feedback: - decrease default to 1 min - compaction cannot be disabled - compaction interval must be greater than 0, and <= 5 mins MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- docs/operations/blocks-storage.md | 5 +++-- pkg/ingester/ingester_v2.go | 6 ++---- pkg/storage/tsdb/config.go | 8 ++++---- pkg/storage/tsdb/config_test.go | 25 +++++++++++++++++++------ 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 2dbbf2cbf75..f8970712eb9 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -124,10 +124,11 @@ tsdb: [ship_concurrency: | default = 10] # How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. + # Must be greater than 0 and max 5 minutes. # CLI flag: -experimental.tsdb.head-compaction-interval - [head_compaction_interval: | default = 5m] + [head_compaction_interval: | default = 1m] - # Maximum number of tenants concurrently compacting TSDB head. + # Maximum number of tenants concurrently compacting TSDB head into a new block. # CLI flag: -experimental.tsdb.head-compaction-concurrency [head_compaction_concurrency: | default = 5] diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index ed7b4ddc2d7..62b6866e797 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -145,10 +145,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.done.Add(1) go i.updateLoop() - if i.cfg.TSDBConfig.HeadCompactionConcurrency > 0 && i.cfg.TSDBConfig.HeadCompactionInterval > 0 { - i.done.Add(1) - go i.compactionLoop() - } + i.done.Add(1) + go i.compactionLoop() return i, nil } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 343bcbe3114..0c84200d21f 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -112,8 +112,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") - f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 5*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range.") - f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting locally-stored TSDB blocks") + f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") + f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") } // Validate the config @@ -126,11 +126,11 @@ func (cfg *Config) Validate() error { return errInvalidShipConcurrency } - if cfg.HeadCompactionInterval < 0 { + if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute { return errInvalidCompactionInterval } - if cfg.HeadCompactionInterval > 0 && cfg.HeadCompactionConcurrency <= 0 { + if cfg.HeadCompactionConcurrency <= 0 { return errInvalidCompactionConcurrency } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index e76f4566364..0a2514df063 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -17,13 +17,17 @@ func TestConfig_Validate(t *testing.T) { }{ "should pass on S3 backend": { config: Config{ - Backend: "s3", + Backend: "s3", + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, }, expectedErr: nil, }, "should pass on GCS backend": { config: Config{ - Backend: "gcs", + Backend: "gcs", + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, }, expectedErr: nil, }, @@ -43,16 +47,25 @@ func TestConfig_Validate(t *testing.T) { }, "should pass on invalid ship concurrency but shipping is disabled": { config: Config{ - Backend: "s3", - ShipInterval: 0, - ShipConcurrency: 0, + Backend: "s3", + ShipInterval: 0, + ShipConcurrency: 0, + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, }, expectedErr: nil, }, "should fail on invalid compaction interval": { config: Config{ Backend: "s3", - HeadCompactionInterval: -1 * time.Minute, + HeadCompactionInterval: 0 * time.Minute, + }, + expectedErr: errInvalidCompactionInterval, + }, + "should fail on too high compaction interval": { + config: Config{ + Backend: "s3", + HeadCompactionInterval: 10 * time.Minute, }, expectedErr: errInvalidCompactionInterval, }, From c0de26d6b221b7b09f04781bd43dc30575e9f040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 15:15:27 +0100 Subject: [PATCH 20/21] Fix CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01e54f88902..41a403e54e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 -* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 5m interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172 +* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` From 10b65f98039b81500f3ce47054f20806edbb9c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 24 Feb 2020 15:16:43 +0100 Subject: [PATCH 21/21] Fix formatting to make lint happy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/util/grpcclient/ratelimit_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/grpcclient/ratelimit_test.go b/pkg/util/grpcclient/ratelimit_test.go index 6e7eb7a466d..6a8d6345b9b 100644 --- a/pkg/util/grpcclient/ratelimit_test.go +++ b/pkg/util/grpcclient/ratelimit_test.go @@ -8,7 +8,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - + "github.com/cortexproject/cortex/pkg/util/grpcclient" )