Skip to content

Commit

Permalink
address manish's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanJain8 committed May 3, 2021
1 parent c63236a commit 6490e0d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 69 deletions.
140 changes: 75 additions & 65 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@ func (db *DB) dropAll() (func(), error) {
// It does this in the following way:
// - Stream the given prefixes at a given ts.
// - Write them to skiplist at the specified ts and handover that skiplist to DB.
func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
if db.opt.ReadOnly {
return errors.New("Attempting to drop data in read-only mode.")
}
Expand All @@ -1852,86 +1852,84 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
}
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)

cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking")
defer cbuf.Release()

var wg sync.WaitGroup
handover := func(force bool) error {
if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize {
return nil
}

var kvs []*pb.KV
err := cbuf.SliceIterate(func(slice []byte) error {
var kv pb.KV
if err := kv.Unmarshal(slice); err != nil {
return err
}
kvs = append(kvs, &kv)
return nil
})
if err != nil {
return err
}
cbuf.Reset()

// TODO: Maybe move the rest of it to a separate go routine, if it is slow.
// Sort the kvs, add them to the builder, and hand it over to DB.
b := skl.NewBuilder(db.opt.MemTableSize)
sort.Slice(kvs, func(i, j int) bool {
return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0
})
for _, kv := range kvs {
b.Add(y.KeyWithTs(kv.Key, kv.Version+1), y.ValueStruct{Meta: bitDelete})
}
wg.Add(1)
return db.HandoverSkiplist(b.Skiplist(), wg.Done)
}

dropPrefix := func(prefix []byte) error {
stream := db.NewStreamAt(ts)
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
stream.Prefix = prefix
// Use the default implementation with some changes. We don't need anything except key.
// We don't need anything except key and version.
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
if !itr.Valid() {
return nil, nil
}
item := itr.Item()
if item.IsDeletedOrExpired() {
return nil, nil
}
if !bytes.Equal(key, item.Key()) {
// Return on the encounter with another key.
return nil, nil
}

a := itr.Alloc
ka := a.Copy(key)

list := &pb.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if item.IsDeletedOrExpired() {
break
}
if !bytes.Equal(key, item.Key()) {
// Break out on the first encounter with another key.
break
}

kv := y.NewKV(a)
kv.Key = ka
list.Kv = append(list.Kv, kv)

if db.opt.NumVersionsToKeep == 1 {
break
}

if item.DiscardEarlierVersions() {
break
}
}
// We need to generate only a single delete marker per key. All the versions for this
// key will be considered deleted, if we delete the one at highest version.
kv := y.NewKV(a)
kv.Key = ka
kv.Version = item.Version()
list.Kv = append(list.Kv, kv)
itr.Next()
return list, nil
}

var wg sync.WaitGroup
builderMap := make(map[uint32]*skl.Builder)
initSize := int64(float64(db.opt.MemTableSize) * 1.1)

handover := func(force bool) error {
for id, b := range builderMap {
sl := b.Skiplist()
if force || sl.MemSize() > db.opt.MemTableSize {
wg.Add(1)
if err := db.HandoverSkiplist(sl, wg.Done); err != nil {
return err
}
// Create a fresh builder.
builderMap[id] = skl.NewBuilder(initSize)
}
}
return nil
}

stream.Send = func(buf *z.Buffer) error {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
if _, ok := builderMap[kv.StreamId]; !ok {
builderMap[kv.StreamId] = skl.NewBuilder(initSize)
}
builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete})
return nil
})
if err != nil {
return err
}
sz := buf.LenNoPadding()
dst := cbuf.Allocate(sz)
y.AssertTrue(sz == copy(dst, buf.Bytes()))
return handover(false)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
// Flush the remaining skiplists if any.
if err := handover(true); err != nil {
return err
}
wg.Wait()
return nil
return handover(true)
}

// Iterate over all the prefixes and logically drop them.
Expand All @@ -1940,9 +1938,21 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
}
}

wg.Wait()
return nil
}

// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops
// the prefixes by blocking the writes or doing a logical drop.
// See DropPrefixBlocking and DropPrefixNonBlocking for more information.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
if db.opt.BlockWritesOnDrop {
return db.DropPrefixBlocking(prefixes...)
}
return db.DropPrefixNonBlocking(prefixes...)
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
Expand All @@ -1954,7 +1964,7 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error {
if len(prefixes) == 0 {
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
db, err := OpenManaged(DefaultOptions(dir).WithBlockWritesOnDrop(false))
require.NoError(t, err)
defer db.Close()

Expand All @@ -1081,13 +1081,15 @@ func TestDropPrefixNonBlocking(t *testing.T) {

read := func() {
txn := db.NewTransactionAt(6, false)
defer txn.Discard()
iterOpts := DefaultIteratorOptions
iterOpts.Prefix = []byte("aa")
it := txn.NewIterator(iterOpts)
defer it.Close()

cnt := 0
for it.Rewind(); it.Valid(); it.Next() {
fmt.Printf("%+v", it.Item())
cnt++
}

Expand All @@ -1096,7 +1098,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {

write()
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...))
require.NoError(t, db.DropPrefix(prefixes...))
read()

// Writing again at same timestamp and verifying that vlog rewrites don't allow us to read
Expand Down Expand Up @@ -1135,20 +1137,21 @@ func TestDropPrefixNonBlockingNoError(t *testing.T) {
} else if !shouldFail {
require.NoError(t, err)
}
txn.Discard()
}
}
}

closer := z.NewCloser(1)
go writer(db, true, closer)
time.Sleep(time.Millisecond * 100)
require.NoError(t, db.DropPrefix([]byte("aa")))
require.NoError(t, db.DropPrefixBlocking([]byte("aa")))
closer.SignalAndWait()

closer2 := z.NewCloser(1)
go writer(db, false, closer2)
time.Sleep(time.Millisecond * 50)
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...))
require.NoError(t, db.DropPrefixNonBlocking(prefixes...))
closer2.SignalAndWait()
}
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// BlockWritesOnDrop determines whether the DropPrefix will be blocking/non-blocking.
BlockWritesOnDrop bool

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
Expand Down Expand Up @@ -674,6 +677,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
return opt
}

// WithDropMode returns a new Options value with DropMode set to the given value.
//
// BlockWritesOnDrop indicates whether the call to DropPrefix should block the writes or not.
// When set to false, the DropPrefix will do a logical delete and will not block
// the writes. Although, this will not immediately clear up the LSM tree.
// When set to false, the DropPrefix will block the writes and will clear up the LSM
// tree.
//
// The default value of BlockingDrop is false.
func (opt Options) WithBlockWritesOnDrop(b bool) Options {
opt.BlockWritesOnDrop = b
return opt
}

// WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value.
//
// This value specifies how much data cache should hold in memory. A small size
Expand Down

0 comments on commit 6490e0d

Please sign in to comment.