diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index be319bc9eab..7fde6eec60d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -16,17 +16,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" @@ -1096,98 +1095,100 @@ func (i *Ingester) closeAllTSDB() { // concurrently opening TSDB. func (i *Ingester) openExistingTSDB(ctx context.Context) error { level.Info(util.Logger).Log("msg", "opening existing TSDBs") - wg := &sync.WaitGroup{} - openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup) - // Keep track of all errors that could occur. - errs := tsdb_errors.MultiError{} - errsMx := sync.Mutex{} + queue := make(chan string) + group, groupCtx := errgroup.WithContext(ctx) - walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - // If the root directory doesn't exist, we're OK (not needed to be created upfront). - if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir { - return filepath.SkipDir - } + // Create a pool of workers which will open existing TSDBs. + for n := 0; n < i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup; n++ { + group.Go(func() error { + for userID := range queue { + startTime := time.Now() - level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err) - return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path) - } + db, err := i.createTSDB(userID) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID) + return errors.Wrapf(err, "unable to open TSDB for user %s", userID) + } + + // Add the database to the map of user databases + i.userStatesMtx.Lock() + i.TSDBState.dbs[userID] = db + i.userStatesMtx.Unlock() + i.metrics.memUsers.Inc() + + i.TSDBState.walReplayTime.Observe(time.Since(startTime).Seconds()) + } - // Skip root dir and all other files - if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() { return nil - } + }) + } - // Top level directories are assumed to be user TSDBs - userID := info.Name() - f, err := os.Open(path) - if err != nil { - level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path) - return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID) - } - defer f.Close() + // Spawn a goroutine to find all users with a TSDB on the filesystem. + group.Go(func() error { + // Close the queue once filesystem walking is done. + defer close(queue) + + walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + // If the root directory doesn't exist, we're OK (not needed to be created upfront). + if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir { + return filepath.SkipDir + } - // If the dir is empty skip it - if _, err := f.Readdirnames(1); err != nil { - if err == io.EOF { - return filepath.SkipDir + level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err) + return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path) } - level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path) - return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID) - } + // Skip root dir and all other files + if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() { + return nil + } - // Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled. - if err := openGate.Start(ctx); err != nil { - return err - } + // Top level directories are assumed to be user TSDBs + userID := info.Name() + f, err := os.Open(path) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path) + return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID) + } + defer f.Close() - wg.Add(1) - go func(userID string) { - defer wg.Done() - defer openGate.Done() - defer func(ts time.Time) { - i.TSDBState.walReplayTime.Observe(time.Since(ts).Seconds()) - }(time.Now()) + // If the dir is empty skip it + if _, err := f.Readdirnames(1); err != nil { + if err == io.EOF { + return filepath.SkipDir + } - db, err := i.createTSDB(userID) - if err != nil { - errsMx.Lock() - errs.Add(errors.Wrapf(err, "unable to open TSDB for user %s", userID)) - errsMx.Unlock() + level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path) + return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID) + } - level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID) - return + // Enqueue the user to be processed. + select { + case queue <- userID: + // Nothing to do. + case <-groupCtx.Done(): + // Interrupt in case a failure occurred in another goroutine. + return nil } - // Add the database to the map of user databases - i.userStatesMtx.Lock() - i.TSDBState.dbs[userID] = db - i.userStatesMtx.Unlock() - i.metrics.memUsers.Inc() - }(userID) + // Don't descend into subdirectories. + return filepath.SkipDir + }) - return filepath.SkipDir // Don't descend into directories + return errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir) }) - if walkErr != nil { - errsMx.Lock() - errs.Add(errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)) - errsMx.Unlock() - } - - // Wait for all opening routines to finish - wg.Wait() - - // Ensure no error occurred. - if errs.Err() == nil { - level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs") - return nil + // Wait for all workers to complete. + err := group.Wait() + if err != nil { + level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", err) + return err } - level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", errs.Error()) - return errs.Err() + level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs") + return nil } // numSeriesInTSDB returns the total number of in-memory series across all open TSDBs. diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 8e7a9c57089..dfdb5a986d7 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1602,11 +1602,13 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { t.Parallel() tests := map[string]struct { + concurrency int setup func(*testing.T, string) check func(*testing.T, *Ingester) expectedErr string }{ "should not load TSDB if the user directory is empty": { + concurrency: 10, setup: func(t *testing.T, dir string) { require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700)) }, @@ -1615,12 +1617,14 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { }, }, "should not load any TSDB if the root directory is empty": { - setup: func(t *testing.T, dir string) {}, + concurrency: 10, + setup: func(t *testing.T, dir string) {}, check: func(t *testing.T, i *Ingester) { require.Zero(t, len(i.TSDBState.dbs)) }, }, "should not load any TSDB is the root directory is missing": { + concurrency: 10, setup: func(t *testing.T, dir string) { require.NoError(t, os.Remove(dir)) }, @@ -1629,6 +1633,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { }, }, "should load TSDB for any non-empty user directory": { + concurrency: 10, setup: func(t *testing.T, dir string) { require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700)) require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700)) @@ -1641,7 +1646,26 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { require.Nil(t, i.getTSDB("user2")) }, }, - "should fail and rollback if an error occur while loading any user's TSDB": { + "should load all TSDBs on concurrency < number of TSDBs": { + concurrency: 2, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700)) + }, + check: func(t *testing.T, i *Ingester) { + require.Equal(t, 5, len(i.TSDBState.dbs)) + require.NotNil(t, i.getTSDB("user0")) + require.NotNil(t, i.getTSDB("user1")) + require.NotNil(t, i.getTSDB("user2")) + require.NotNil(t, i.getTSDB("user3")) + require.NotNil(t, i.getTSDB("user4")) + }, + }, + "should fail and rollback if an error occur while loading a TSDB on concurrency > number of TSDBs": { + concurrency: 10, setup: func(t *testing.T, dir string) { // Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and // opening TSDB should fail). @@ -1658,6 +1682,30 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { }, expectedErr: "unable to open TSDB for user user0", }, + "should fail and rollback if an error occur while loading a TSDB on concurrency < number of TSDBs": { + concurrency: 2, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700)) + + // Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and + // opening TSDB should fail). + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "wal", ""), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "chunks_head", ""), 0700)) + require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "user2", "chunks_head", "00000001"), nil, 0700)) + }, + check: func(t *testing.T, i *Ingester) { + require.Equal(t, 0, len(i.TSDBState.dbs)) + require.Nil(t, i.getTSDB("user0")) + require.Nil(t, i.getTSDB("user1")) + require.Nil(t, i.getTSDB("user2")) + require.Nil(t, i.getTSDB("user3")) + require.Nil(t, i.getTSDB("user4")) + }, + expectedErr: "unable to open TSDB for user user2", + }, } for name, test := range tests { @@ -1678,6 +1726,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { ingesterCfg := defaultIngesterTestConfig() ingesterCfg.BlocksStorageEnabled = true ingesterCfg.BlocksStorageConfig.TSDB.Dir = tempDir + ingesterCfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup = testData.concurrency ingesterCfg.BlocksStorageConfig.Bucket.Backend = "s3" ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost" diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 11b440af4a0..578d882c23e 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -55,6 +55,7 @@ var ( errUnsupportedStorageBackend = errors.New("unsupported TSDB storage backend") errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") errInvalidStripeSize = errors.New("invalid TSDB stripe size") @@ -209,6 +210,10 @@ func (cfg *TSDBConfig) Validate() error { return errInvalidShipConcurrency } + if cfg.MaxTSDBOpeningConcurrencyOnStartup <= 0 { + return errInvalidOpeningConcurrency + } + if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute { return errInvalidCompactionInterval } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 63a0d5870f9..4883081ce92 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -6,275 +6,105 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) func TestConfig_Validate(t *testing.T) { t.Parallel() tests := map[string]struct { - config BlocksStorageConfig + setup func(*BlocksStorageConfig) expectedErr error }{ "should pass on S3 backend": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.Bucket.Backend = "s3" }, expectedErr: nil, }, "should pass on GCS backend": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "gcs", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.Bucket.Backend = "gcs" }, expectedErr: nil, }, "should fail on unknown storage backend": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "unknown", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.Bucket.Backend = "unknown" }, expectedErr: errUnsupportedStorageBackend, }, "should fail on invalid ship concurrency": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - ShipInterval: time.Minute, - ShipConcurrency: 0, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.ShipConcurrency = 0 }, expectedErr: errInvalidShipConcurrency, }, "should pass on invalid ship concurrency but shipping is disabled": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - ShipInterval: 0, - ShipConcurrency: 0, - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.ShipConcurrency = 0 + cfg.TSDB.ShipInterval = 0 }, expectedErr: nil, }, + "should fail on invalid opening concurrency": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.MaxTSDBOpeningConcurrencyOnStartup = 0 + }, + expectedErr: errInvalidOpeningConcurrency, + }, "should fail on invalid compaction interval": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 0 * time.Minute, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.HeadCompactionInterval = 0 }, expectedErr: errInvalidCompactionInterval, }, "should fail on too high compaction interval": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 10 * time.Minute, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.HeadCompactionInterval = 10 * time.Minute }, expectedErr: errInvalidCompactionInterval, }, "should fail on invalid compaction concurrency": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: time.Minute, - HeadCompactionConcurrency: 0, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.HeadCompactionConcurrency = 0 }, expectedErr: errInvalidCompactionConcurrency, }, - "should pass on on valid compaction config": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: time.Minute, - HeadCompactionConcurrency: 10, - StripeSize: 2, - BlockRanges: DurationList{1 * time.Minute}, - }, + "should pass on valid compaction concurrency": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.HeadCompactionConcurrency = 10 }, expectedErr: nil, }, "should fail on negative stripe size": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: -2, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.StripeSize = -2 }, expectedErr: errInvalidStripeSize, }, "should fail on stripe size 0": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 0, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.StripeSize = 0 }, expectedErr: errInvalidStripeSize, }, "should fail on stripe size 1": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 1, - BlockRanges: DurationList{1 * time.Minute}, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.StripeSize = 1 }, expectedErr: errInvalidStripeSize, }, - "should pass on stripe size": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 1 << 14, - BlockRanges: DurationList{1 * time.Minute}, - }, + "should pass on valid stripe size": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.StripeSize = 1 << 14 }, expectedErr: nil, }, "should fail on empty block ranges": { - config: BlocksStorageConfig{ - Bucket: BucketConfig{ - Backend: "s3", - }, - TSDB: TSDBConfig{ - HeadCompactionInterval: 1 * time.Minute, - HeadCompactionConcurrency: 5, - StripeSize: 8, - }, - BucketStore: BucketStoreConfig{ - IndexCache: IndexCacheConfig{ - Backend: "inmemory", - }, - }, + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.BlockRanges = nil }, expectedErr: errEmptyBlockranges, }, @@ -284,7 +114,11 @@ func TestConfig_Validate(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - actualErr := testData.config.Validate() + cfg := &BlocksStorageConfig{} + flagext.DefaultValues(cfg) + testData.setup(cfg) + + actualErr := cfg.Validate() assert.Equal(t, testData.expectedErr, actualErr) }) }