Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
* [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299
* [BUGFIX] Fixed Gossip memberlist members joining when addresses are configured using DNS-based service discovery. #3360
* [BUGFIX] Ingester: fail to start an ingester running the blocks storage, if unable to load any existing TSDB at startup. #3354

## 1.4.0 / 2020-10-02

Expand Down
55 changes: 43 additions & 12 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
Expand Down Expand Up @@ -253,6 +254,9 @@ func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, e

func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {
if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()

return errors.Wrap(err, "opening existing TSDBs")
}

Expand All @@ -262,6 +266,9 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {

func (i *Ingester) startingV2(ctx context.Context) error {
if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()

return errors.Wrap(err, "opening existing TSDBs")
}

Expand Down Expand Up @@ -1057,9 +1064,19 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
wg := &sync.WaitGroup{}
openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup)

err := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
// Keep track of all errors that could occur.
errs := tsdb_errors.MultiError{}
errsMx := sync.Mutex{}

walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return filepath.SkipDir
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
return filepath.SkipDir
}

level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
}

// Skip root dir and all other files
Expand All @@ -1071,18 +1088,19 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
userID := info.Name()
f, err := os.Open(path)
if err != nil {
level.Error(util.Logger).Log("msg", "unable to open user TSDB dir", "err", err, "user", userID, "path", path)
return filepath.SkipDir
level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
}
defer f.Close()

// If the dir is empty skip it
if _, err := f.Readdirnames(1); err != nil {
if err != io.EOF {
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
if err == io.EOF {
return filepath.SkipDir
}

return filepath.SkipDir
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
}

// Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled.
Expand All @@ -1100,7 +1118,11 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {

db, err := i.createTSDB(userID)
if err != nil {
level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID)
errsMx.Lock()
errs.Add(errors.Wrapf(err, "unable to open TSDB for user %s", userID))
errsMx.Unlock()

level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
return
}

Expand All @@ -1114,14 +1136,23 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
return filepath.SkipDir // Don't descend into directories
})

if walkErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot happen currently, since walkFn will filter out any error. (see other comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given I addressed the other comment, why can't happen? The Walk() interrupts and returns error as soon as we return error. Errors returned by Walk() itself are not filtered again via Walk().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even before it could, in case os.Open(path) or f.Readdirnames(1) failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

errsMx.Lock()
errs.Add(errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir))
errsMx.Unlock()
}

// Wait for all opening routines to finish
wg.Wait()
if err != nil {
level.Error(util.Logger).Log("msg", "error while opening existing TSDBs")
} else {

// Ensure no error occurred.
if errs.Err() == nil {
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
return nil
}
return err

level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", errs.Error())
return errs.Err()
}

// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs.
Expand Down
55 changes: 40 additions & 15 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,47 +1596,66 @@ func newIngesterMockWithTSDBStorageAndLimits(ingesterCfg Config, limits validati
return ingester, nil
}

func TestIngester_v2LoadTSDBOnStartup(t *testing.T) {
func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
t.Parallel()

tests := map[string]struct {
setup func(*testing.T, string)
check func(*testing.T, *Ingester)
setup func(*testing.T, string)
check func(*testing.T, *Ingester)
expectedErr string
}{
"empty user dir": {
"should not load TSDB if the user directory is empty": {
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700))
},
check: func(t *testing.T, i *Ingester) {
require.Empty(t, i.getTSDB("user0"), "tsdb created for empty user dir")
require.Nil(t, i.getTSDB("user0"))
},
},
"empty tsdbs": {
"should not load any TSDB if the root directory is empty": {
setup: func(t *testing.T, dir string) {},
check: func(t *testing.T, i *Ingester) {
require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on empty dir")
require.Zero(t, len(i.TSDBState.dbs))
},
},
"missing tsdb dir": {
"should not load any TSDB is the root directory is missing": {
setup: func(t *testing.T, dir string) {
require.NoError(t, os.Remove(dir))
},
check: func(t *testing.T, i *Ingester) {
require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on missing dir")
require.Zero(t, len(i.TSDBState.dbs))
},
},
"populated user dirs with unpopulated": {
"should load TSDB for any non-empty user directory": {
setup: func(t *testing.T, dir string) {
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
require.NoError(t, os.Mkdir(filepath.Join(dir, "user2"), 0700))
},
check: func(t *testing.T, i *Ingester) {
require.NotNil(t, i.getTSDB("user0"), "tsdb not created for non-empty user dir")
require.NotNil(t, i.getTSDB("user1"), "tsdb not created for non-empty user dir")
require.Empty(t, i.getTSDB("user2"), "tsdb created for empty user dir")
require.Equal(t, 2, len(i.TSDBState.dbs))
require.NotNil(t, i.getTSDB("user0"))
require.NotNil(t, i.getTSDB("user1"))
require.Nil(t, i.getTSDB("user2"))
},
},
"should fail and rollback if an error occur while loading any user's TSDB": {
setup: func(t *testing.T, dir string) {
// Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and
// opening TSDB should fail).
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "wal", ""), 0700))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "chunks_head", ""), 0700))
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "user0", "chunks_head", "00000001"), nil, 0700))

require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
},
check: func(t *testing.T, i *Ingester) {
require.Equal(t, 0, len(i.TSDBState.dbs))
require.Nil(t, i.getTSDB("user0"))
require.Nil(t, i.getTSDB("user1"))
},
expectedErr: "unable to open TSDB for user user0",
},
}

for name, test := range tests {
Expand Down Expand Up @@ -1665,10 +1684,16 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) {

ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))

defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck
startErr := services.StartAndAwaitRunning(context.Background(), ingester)
if testData.expectedErr == "" {
require.NoError(t, startErr)
} else {
require.Error(t, startErr)
assert.Contains(t, startErr.Error(), testData.expectedErr)
}

defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck
testData.check(t, ingester)
})
}
Expand Down