Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: Do not leave behind state goroutines #1349

Merged
merged 17 commits into from
Jun 9, 2020
40 changes: 36 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ func Open(opt Options) (db *DB, err error) {
orc: newOracle(opt),
pub: newPublisher(),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
if err != nil {
db.cleanup()
db = nil
}
}()

if opt.MaxCacheSize > 0 {
config := ristretto.Config{
Expand Down Expand Up @@ -322,7 +329,6 @@ func Open(opt Options) (db *DB, err error) {
return nil, errors.Wrap(err, "failed to create bf cache")
}
}

if db.opt.InMemory {
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
Expand All @@ -337,7 +343,7 @@ func Open(opt Options) (db *DB, err error) {
}

if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
return nil, err
return db, err
}
db.calculateSize()
db.closers.updateSize = y.NewCloser(1)
Expand All @@ -346,7 +352,7 @@ func Open(opt Options) (db *DB, err error) {

// newLevelsController potentially loads files in directory.
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return nil, err
return db, err
}

// Initialize vlog struct.
Expand All @@ -366,7 +372,7 @@ func Open(opt Options) (db *DB, err error) {
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
vs, err := db.get(headKey)
if err != nil {
return nil, errors.Wrap(err, "Retrieving head")
return db, errors.Wrap(err, "Retrieving head")
}
db.orc.nextTxnTs = vs.Version
var vptr valuePointer
Expand All @@ -378,6 +384,7 @@ func Open(opt Options) (db *DB, err error) {
go db.doWrites(replayCloser)

if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil {
replayCloser.SignalAndWait()
return db, y.Wrapf(err, "During db.vlog.open")
}
replayCloser.SignalAndWait() // Wait for replay to be applied first.
Expand Down Expand Up @@ -408,6 +415,31 @@ func Open(opt Options) (db *DB, err error) {
return db, nil
}

// cleanup stops all the goroutines started by badger. This is used in open to
// cleanup goroutines in case of an error.
func (db *DB) cleanup() {
db.blockCache.Close()
db.bfCache.Close()
db.stopMemoryFlush()
db.stopCompactions()

if db.closers.updateSize != nil {
db.closers.updateSize.Signal()
}
if db.closers.valueGC != nil {
db.closers.valueGC.Signal()
}
if db.closers.writes != nil {
db.closers.writes.Signal()
}
if db.closers.pub != nil {
db.closers.pub.Signal()
}

db.orc.Stop()
db.vlog.Close()
}

// DataCacheMetrics returns the metrics for the underlying data cache.
func (db *DB) DataCacheMetrics() *ristretto.Metrics {
if db.blockCache != nil {
Expand Down
14 changes: 6 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,9 @@ func TestExpiry(t *testing.T) {
func TestExpiryImproperDBClose(t *testing.T) {
testReplay := func(opt Options) {

db0, err := Open(opt)
// L0 compaction doesn't affect the test in any way. It is set to allow
// graceful shutdown of db0.
db0, err := Open(opt.WithCompactL0OnClose(false))
require.NoError(t, err)

dur := 1 * time.Hour
Expand All @@ -1280,17 +1282,13 @@ func TestExpiryImproperDBClose(t *testing.T) {
// Simulate a crash by not closing db0, but releasing the locks.
if db0.dirLockGuard != nil {
require.NoError(t, db0.dirLockGuard.release())
db0.dirLockGuard = nil
}
if db0.valueDirGuard != nil {
require.NoError(t, db0.valueDirGuard.release())
db0.valueDirGuard = nil
}
// We need to close vlog to fix the vlog file size. On windows, the vlog file
// is truncated to 2*MaxVlogSize and if we don't close the vlog file, reopening
// it would return Truncate Required Error.
require.NoError(t, db0.vlog.Close())

require.NoError(t, db0.registry.Close())
require.NoError(t, db0.manifest.close())
require.NoError(t, db0.Close())

db1, err := Open(opt)
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,15 @@ func TestL0Stall(t *testing.T) {
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
} else {
t.Fatal("Test didn't finish in time")
}
Expand Down
10 changes: 6 additions & 4 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestDropAllTwice(t *testing.T) {

// Call DropAll again.
require.NoError(t, db.DropAll())
require.NoError(t, db.Close())
}
t.Run("disk mode", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
Expand All @@ -175,7 +176,6 @@ func TestDropAllTwice(t *testing.T) {
opts := getTestOptions("")
opts.InMemory = true
test(t, opts)

})
}

Expand Down Expand Up @@ -278,8 +278,9 @@ func TestDropReadOnly(t *testing.T) {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropAll() })
require.NoError(t, db2.Close())
}
require.Panics(t, func() { db2.DropAll() })
}

func TestWriteAfterClose(t *testing.T) {
Expand Down Expand Up @@ -523,8 +524,9 @@ func TestDropPrefixReadOnly(t *testing.T) {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropPrefix([]byte("key0")) })
require.NoError(t, db2.Close())
}
require.Panics(t, func() { db2.DropPrefix([]byte("key0")) })
}

func TestDropPrefixRace(t *testing.T) {
Expand Down Expand Up @@ -590,7 +592,7 @@ func TestDropPrefixRace(t *testing.T) {
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropprefix: %d\n", before, after)
require.True(t, after < before)
db.Close()
require.NoError(t, db.Close())
}

func TestWriteBatchManagedMode(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func (lf *logFile) init() error {
}

func (vlog *valueLog) Close() error {
if vlog.db.opt.InMemory {
if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
return nil
}
// close flushDiscardStats.
Expand Down
6 changes: 3 additions & 3 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ func TestPenultimateLogCorruption(t *testing.T) {

db0, err := Open(opt)
require.NoError(t, err)
defer func() { require.NoError(t, db0.Close()) }()

h := testHelper{db: db0, t: t}
h.writeRange(0, 7)
Expand All @@ -780,13 +781,12 @@ func TestPenultimateLogCorruption(t *testing.T) {
// Simulate a crash by not closing db0, but releasing the locks.
if db0.dirLockGuard != nil {
require.NoError(t, db0.dirLockGuard.release())
db0.dirLockGuard = nil
}
if db0.valueDirGuard != nil {
require.NoError(t, db0.valueDirGuard.release())
db0.valueDirGuard = nil
}
require.NoError(t, db0.vlog.Close())
require.NoError(t, db0.manifest.close())
require.NoError(t, db0.registry.Close())

opt.Truncate = true
db1, err := Open(opt)
Expand Down