Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 75 additions & 74 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -1096,98 +1095,100 @@ func (i *Ingester) closeAllTSDB() {
// concurrently opening TSDB.
func (i *Ingester) openExistingTSDB(ctx context.Context) error {
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
wg := &sync.WaitGroup{}
openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup)

// Keep track of all errors that could occur.
errs := tsdb_errors.MultiError{}
errsMx := sync.Mutex{}
queue := make(chan string)
group, groupCtx := errgroup.WithContext(ctx)

walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
return filepath.SkipDir
}
// Create a pool of workers which will open existing TSDBs.
for n := 0; n < i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup; n++ {
group.Go(func() error {
for userID := range queue {
startTime := time.Now()

level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
}
db, err := i.createTSDB(userID)
if err != nil {
level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
}

// Add the database to the map of user databases
i.userStatesMtx.Lock()
i.TSDBState.dbs[userID] = db
i.userStatesMtx.Unlock()
i.metrics.memUsers.Inc()

i.TSDBState.walReplayTime.Observe(time.Since(startTime).Seconds())
}

// Skip root dir and all other files
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
return nil
}
})
}

// Top level directories are assumed to be user TSDBs
userID := info.Name()
f, err := os.Open(path)
if err != nil {
level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
}
defer f.Close()
// Spawn a goroutine to find all users with a TSDB on the filesystem.
group.Go(func() error {
// Close the queue once filesystem walking is done.
defer close(queue)

walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
return filepath.SkipDir
}

// If the dir is empty skip it
if _, err := f.Readdirnames(1); err != nil {
if err == io.EOF {
return filepath.SkipDir
level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
}

level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
}
// Skip root dir and all other files
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
return nil
}

// Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled.
if err := openGate.Start(ctx); err != nil {
return err
}
// Top level directories are assumed to be user TSDBs
userID := info.Name()
f, err := os.Open(path)
if err != nil {
level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
}
defer f.Close()

wg.Add(1)
go func(userID string) {
defer wg.Done()
defer openGate.Done()
defer func(ts time.Time) {
i.TSDBState.walReplayTime.Observe(time.Since(ts).Seconds())
}(time.Now())
// If the dir is empty skip it
if _, err := f.Readdirnames(1); err != nil {
if err == io.EOF {
return filepath.SkipDir
}

db, err := i.createTSDB(userID)
if err != nil {
errsMx.Lock()
errs.Add(errors.Wrapf(err, "unable to open TSDB for user %s", userID))
errsMx.Unlock()
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
}

level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return
// Enqueue the user to be processed.
select {
case queue <- userID:
// Nothing to do.
case <-groupCtx.Done():
// Interrupt in case a failure occurred in another goroutine.
return nil
}

// Add the database to the map of user databases
i.userStatesMtx.Lock()
i.TSDBState.dbs[userID] = db
i.userStatesMtx.Unlock()
i.metrics.memUsers.Inc()
}(userID)
// Don't descend into subdirectories.
return filepath.SkipDir
})

return filepath.SkipDir // Don't descend into directories
return errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)
})

if walkErr != nil {
errsMx.Lock()
errs.Add(errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir))
errsMx.Unlock()
}

// Wait for all opening routines to finish
wg.Wait()

// Ensure no error occurred.
if errs.Err() == nil {
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
return nil
// Wait for all workers to complete.
err := group.Wait()
if err != nil {
level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", err)
return err
}

level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", errs.Error())
return errs.Err()
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
return nil
}

// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs.
Expand Down
53 changes: 51 additions & 2 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,11 +1602,13 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
t.Parallel()

tests := map[string]struct {
concurrency int
setup func(*testing.T, string)
check func(*testing.T, *Ingester)
expectedErr string
}{
"should not load TSDB if the user directory is empty": {
concurrency: 10,
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700))
},
Expand All @@ -1615,12 +1617,14 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
},
},
"should not load any TSDB if the root directory is empty": {
setup: func(t *testing.T, dir string) {},
concurrency: 10,
setup: func(t *testing.T, dir string) {},
check: func(t *testing.T, i *Ingester) {
require.Zero(t, len(i.TSDBState.dbs))
},
},
"should not load any TSDB is the root directory is missing": {
concurrency: 10,
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Remove(dir))
},
Expand All @@ -1629,6 +1633,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
},
},
"should load TSDB for any non-empty user directory": {
concurrency: 10,
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
Expand All @@ -1641,7 +1646,26 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
require.Nil(t, i.getTSDB("user2"))
},
},
"should fail and rollback if an error occur while loading any user's TSDB": {
"should load all TSDBs on concurrency < number of TSDBs": {
concurrency: 2,
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700))
},
check: func(t *testing.T, i *Ingester) {
require.Equal(t, 5, len(i.TSDBState.dbs))
require.NotNil(t, i.getTSDB("user0"))
require.NotNil(t, i.getTSDB("user1"))
require.NotNil(t, i.getTSDB("user2"))
require.NotNil(t, i.getTSDB("user3"))
require.NotNil(t, i.getTSDB("user4"))
},
},
"should fail and rollback if an error occur while loading a TSDB on concurrency > number of TSDBs": {
concurrency: 10,
setup: func(t *testing.T, dir string) {
// Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and
// opening TSDB should fail).
Expand All @@ -1658,6 +1682,30 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
},
expectedErr: "unable to open TSDB for user user0",
},
"should fail and rollback if an error occur while loading a TSDB on concurrency < number of TSDBs": {
concurrency: 2,
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700))

// Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and
// opening TSDB should fail).
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "wal", ""), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "chunks_head", ""), 0700))
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "user2", "chunks_head", "00000001"), nil, 0700))
},
check: func(t *testing.T, i *Ingester) {
require.Equal(t, 0, len(i.TSDBState.dbs))
require.Nil(t, i.getTSDB("user0"))
require.Nil(t, i.getTSDB("user1"))
require.Nil(t, i.getTSDB("user2"))
require.Nil(t, i.getTSDB("user3"))
require.Nil(t, i.getTSDB("user4"))
},
expectedErr: "unable to open TSDB for user user2",
},
}

for name, test := range tests {
Expand All @@ -1678,6 +1726,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
ingesterCfg := defaultIngesterTestConfig()
ingesterCfg.BlocksStorageEnabled = true
ingesterCfg.BlocksStorageConfig.TSDB.Dir = tempDir
ingesterCfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup = testData.concurrency
ingesterCfg.BlocksStorageConfig.Bucket.Backend = "s3"
ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost"

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (

errUnsupportedStorageBackend = errors.New("unsupported TSDB storage backend")
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
Expand Down Expand Up @@ -209,6 +210,10 @@ func (cfg *TSDBConfig) Validate() error {
return errInvalidShipConcurrency
}

if cfg.MaxTSDBOpeningConcurrencyOnStartup <= 0 {
return errInvalidOpeningConcurrency
}

if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute {
return errInvalidCompactionInterval
}
Expand Down
Loading