diff --git a/CHANGELOG.md b/CHANGELOG.md index 06108d2271d..6e078d282e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ * [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314 * [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299 * [BUGFIX] Fixed Gossip memberlist members joining when addresses are configured using DNS-based service discovery. #3360 +* [BUGFIX] Ingester: fail to start an ingester running the blocks storage, if unable to load any existing TSDB at startup. #3354 ## 1.4.0 / 2020-10-02 diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 2f9f8395037..249361a88c0 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" @@ -253,6 +254,9 @@ func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, e func (i *Ingester) startingV2ForFlusher(ctx context.Context) error { if err := i.openExistingTSDB(ctx); err != nil { + // Try to rollback and close opened TSDBs before halting the ingester. + i.closeAllTSDB() + return errors.Wrap(err, "opening existing TSDBs") } @@ -262,6 +266,9 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error { func (i *Ingester) startingV2(ctx context.Context) error { if err := i.openExistingTSDB(ctx); err != nil { + // Try to rollback and close opened TSDBs before halting the ingester. + i.closeAllTSDB() + return errors.Wrap(err, "opening existing TSDBs") } @@ -1057,9 +1064,19 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { wg := &sync.WaitGroup{} openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup) - err := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error { + // Keep track of all errors that could occur. + errs := tsdb_errors.MultiError{} + errsMx := sync.Mutex{} + + walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error { if err != nil { - return filepath.SkipDir + // If the root directory doesn't exist, we're OK (not needed to be created upfront). + if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir { + return filepath.SkipDir + } + + level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err) + return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path) } // Skip root dir and all other files @@ -1071,18 +1088,19 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { userID := info.Name() f, err := os.Open(path) if err != nil { - level.Error(util.Logger).Log("msg", "unable to open user TSDB dir", "err", err, "user", userID, "path", path) - return filepath.SkipDir + level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path) + return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID) } defer f.Close() // If the dir is empty skip it if _, err := f.Readdirnames(1); err != nil { - if err != io.EOF { - level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path) + if err == io.EOF { + return filepath.SkipDir } - return filepath.SkipDir + level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path) + return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID) } // Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled. @@ -1100,7 +1118,11 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { db, err := i.createTSDB(userID) if err != nil { - level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID) + errsMx.Lock() + errs.Add(errors.Wrapf(err, "unable to open TSDB for user %s", userID)) + errsMx.Unlock() + + level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID) return } @@ -1114,14 +1136,23 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error { return filepath.SkipDir // Don't descend into directories }) + if walkErr != nil { + errsMx.Lock() + errs.Add(errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)) + errsMx.Unlock() + } + // Wait for all opening routines to finish wg.Wait() - if err != nil { - level.Error(util.Logger).Log("msg", "error while opening existing TSDBs") - } else { + + // Ensure no error occurred. + if errs.Err() == nil { level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs") + return nil } - return err + + level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", errs.Error()) + return errs.Err() } // numSeriesInTSDB returns the total number of in-memory series across all open TSDBs. diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 96a33980654..aa59263a2e0 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1596,47 +1596,66 @@ func newIngesterMockWithTSDBStorageAndLimits(ingesterCfg Config, limits validati return ingester, nil } -func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { +func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) { t.Parallel() tests := map[string]struct { - setup func(*testing.T, string) - check func(*testing.T, *Ingester) + setup func(*testing.T, string) + check func(*testing.T, *Ingester) + expectedErr string }{ - "empty user dir": { + "should not load TSDB if the user directory is empty": { setup: func(t *testing.T, dir string) { require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700)) }, check: func(t *testing.T, i *Ingester) { - require.Empty(t, i.getTSDB("user0"), "tsdb created for empty user dir") + require.Nil(t, i.getTSDB("user0")) }, }, - "empty tsdbs": { + "should not load any TSDB if the root directory is empty": { setup: func(t *testing.T, dir string) {}, check: func(t *testing.T, i *Ingester) { - require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on empty dir") + require.Zero(t, len(i.TSDBState.dbs)) }, }, - "missing tsdb dir": { + "should not load any TSDB is the root directory is missing": { setup: func(t *testing.T, dir string) { require.NoError(t, os.Remove(dir)) }, check: func(t *testing.T, i *Ingester) { - require.Zero(t, len(i.TSDBState.dbs), "user tsdb's were created on missing dir") + require.Zero(t, len(i.TSDBState.dbs)) }, }, - "populated user dirs with unpopulated": { + "should load TSDB for any non-empty user directory": { setup: func(t *testing.T, dir string) { require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700)) require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700)) require.NoError(t, os.Mkdir(filepath.Join(dir, "user2"), 0700)) }, check: func(t *testing.T, i *Ingester) { - require.NotNil(t, i.getTSDB("user0"), "tsdb not created for non-empty user dir") - require.NotNil(t, i.getTSDB("user1"), "tsdb not created for non-empty user dir") - require.Empty(t, i.getTSDB("user2"), "tsdb created for empty user dir") + require.Equal(t, 2, len(i.TSDBState.dbs)) + require.NotNil(t, i.getTSDB("user0")) + require.NotNil(t, i.getTSDB("user1")) + require.Nil(t, i.getTSDB("user2")) }, }, + "should fail and rollback if an error occur while loading any user's TSDB": { + setup: func(t *testing.T, dir string) { + // Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and + // opening TSDB should fail). + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "wal", ""), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "chunks_head", ""), 0700)) + require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "user0", "chunks_head", "00000001"), nil, 0700)) + + require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700)) + }, + check: func(t *testing.T, i *Ingester) { + require.Equal(t, 0, len(i.TSDBState.dbs)) + require.Nil(t, i.getTSDB("user0")) + require.Nil(t, i.getTSDB("user1")) + }, + expectedErr: "unable to open TSDB for user user0", + }, } for name, test := range tests { @@ -1665,10 +1684,16 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil) require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) - defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck + startErr := services.StartAndAwaitRunning(context.Background(), ingester) + if testData.expectedErr == "" { + require.NoError(t, startErr) + } else { + require.Error(t, startErr) + assert.Contains(t, startErr.Error(), testData.expectedErr) + } + defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck testData.check(t, ingester) }) }