Skip to content

Commit

Permalink
DropPrefix: Return error on blocked writes (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim Jarif authored May 15, 2020
1 parent aadda9a commit ef28ef3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 8 deletions.
28 changes: 20 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,13 +1459,16 @@ func (db *DB) Flatten(workers int) error {
}
}

func (db *DB) blockWrite() {
func (db *DB) blockWrite() error {
// Stop accepting new writes.
atomic.StoreInt32(&db.blockWrites, 1)
if !atomic.CompareAndSwapInt32(&db.blockWrites, 0, 1) {
return ErrBlockedWrites
}

// Make all pending writes finish. The following will also close writeCh.
db.closers.writes.SignalAndWait()
db.opt.Infof("Writes flushed. Stopping compactions now...")
return nil
}

func (db *DB) unblockWrite() {
Expand All @@ -1476,14 +1479,16 @@ func (db *DB) unblockWrite() {
atomic.StoreInt32(&db.blockWrites, 0)
}

func (db *DB) prepareToDrop() func() {
func (db *DB) prepareToDrop() (func(), error) {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
db.blockWrite()
if err := db.blockWrite(); err != nil {
return nil, err
}
reqs := make([]*request, 0, 10)
for {
select {
Expand All @@ -1498,7 +1503,7 @@ func (db *DB) prepareToDrop() func() {
db.opt.Infof("Resuming writes")
db.startMemoryFlush()
db.unblockWrite()
}
}, nil
}
}
}
Expand All @@ -1516,16 +1521,19 @@ func (db *DB) prepareToDrop() func() {
// writes are paused before running DropAll, and resumed after it is finished.
func (db *DB) DropAll() error {
f, err := db.dropAll()
defer f()
if err != nil {
return err
}
defer f()
return nil
}

func (db *DB) dropAll() (func(), error) {
db.opt.Infof("DropAll called. Blocking writes...")
f := db.prepareToDrop()
f, err := db.prepareToDrop()
if err != nil {
return f, err
}
// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
Expand Down Expand Up @@ -1577,7 +1585,11 @@ func (db *DB) dropAll() (func(), error) {
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
f := db.prepareToDrop()
db.opt.Infof("DropPrefix Called")
f, err := db.prepareToDrop()
if err != nil {
return err
}
defer f()
// Block all foreign interactions with memory tables.
db.Lock()
Expand Down
57 changes: 57 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"path"
"regexp"
"runtime"
"sync"
"testing"
"time"

"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
Expand Down Expand Up @@ -776,3 +778,58 @@ func TestWindowsDataLoss(t *testing.T) {
}
require.ElementsMatch(t, keyList, result)
}

func TestDropAllDropPrefix(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
defer wb.Cancel()

N := 50000

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
require.NoError(t, wb.Flush())

var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
err := db.DropPrefix([]byte("000"))
for err == ErrBlockedWrites {
fmt.Printf("DropPrefix 000 err: %v", err)
err = db.DropPrefix([]byte("000"))
time.Sleep(time.Millisecond * 500)
}
require.NoError(t, err)
}()
go func() {
defer wg.Done()
err := db.DropPrefix([]byte("111"))
for err == ErrBlockedWrites {
fmt.Printf("DropPrefix 111 err: %v", err)
err = db.DropPrefix([]byte("111"))
time.Sleep(time.Millisecond * 500)
}
require.NoError(t, err)
}()
go func() {
time.Sleep(time.Millisecond) // Let drop prefix run first.
defer wg.Done()
err := db.DropAll()
for err == ErrBlockedWrites {
fmt.Printf("dropAll err: %v", err)
err = db.DropAll()
time.Sleep(time.Millisecond * 300)
}
require.NoError(t, err)
}()
wg.Wait()
})
}

0 comments on commit ef28ef3

Please sign in to comment.