From a1bae270e3cc759675a899355bc6ecf00fe4135e Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Thu, 29 Apr 2021 18:32:54 +0530
Subject: [PATCH 1/7] add DropPrefixNonBlocking API

---
 db.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 80 insertions(+), 3 deletions(-)

diff --git a/db.go b/db.go
index 09fc432cc..33f6f34fc 100644
--- a/db.go
+++ b/db.go
@@ -1036,10 +1036,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
 
 	// Iterate over the skiplist and send the entries to the publisher.
 	it := skl.NewIterator()
-	it.SeekToFirst()
 
 	var entries []*Entry
-	for it.Valid() {
+	for it.SeekToFirst(); it.Valid(); it.Next() {
 		v := it.Value()
 		e := &Entry{
 			Key:       it.Key(),
@@ -1048,7 +1047,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
 			UserMeta:  v.UserMeta,
 		}
 		entries = append(entries, e)
-		it.Next()
 	}
 	req := &request{
 		Entries: entries,
@@ -1836,6 +1834,85 @@ func (db *DB) dropAll() (func(), error) {
 	return resume, nil
 }
 
+// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
+// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
+// This operation is useful when we don't want to block writes while we delete the prefixes.
+// It does this in the following way:
+// - Stream the given prefixes at a given timestamp.
+// - Write them to skiplist and handover that skiplist to DB.
+func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error {
+	if db.opt.ReadOnly {
+		panic("Attempting to drop data in read-only mode.")
+	}
+
+	if len(prefixes) == 0 {
+		return nil
+	}
+	db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)
+
+	dropPrefix := func(prefix []byte) error {
+		stream := db.NewStreamAt(readTs)
+		stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
+		stream.Prefix = prefix
+		// Use the default implementation with some changes. We don't need anything except key.
+		stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
+			a := itr.Alloc
+			ka := a.Copy(key)
+
+			list := &pb.KVList{}
+			for ; itr.Valid(); itr.Next() {
+				item := itr.Item()
+				if item.IsDeletedOrExpired() {
+					break
+				}
+				if !bytes.Equal(key, item.Key()) {
+					// Break out on the first encounter with another key.
+					break
+				}
+
+				kv := y.NewKV(a)
+				kv.Key = ka
+				list.Kv = append(list.Kv, kv)
+
+				if db.opt.NumVersionsToKeep == 1 {
+					break
+				}
+
+				if item.DiscardEarlierVersions() {
+					break
+				}
+			}
+			return list, nil
+		}
+		stream.Send = func(buf *z.Buffer) error {
+			// Stream framework already batches the key values.
+			b := skl.NewBuilder(1 << 20) // TODO: Maybe figure out the optimal value.
+			err := buf.SliceIterate(func(s []byte) error {
+				var kv pb.KV
+				if err := kv.Unmarshal(s); err != nil {
+					return err
+				}
+				b.Add(y.KeyWithTs(kv.Key, readTs), y.ValueStruct{Meta: bitDelete})
+				return nil
+			})
+			if err != nil {
+				return err
+			}
+			return db.HandoverSkiplist(b.Skiplist(), nil)
+		}
+
+		return stream.Orchestrate(context.Background())
+	}
+
+	// Iterate over all the prefixes and logically drop them.
+	for _, prefix := range prefixes {
+		if err := dropPrefix(prefix); err != nil {
+			return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
+		}
+	}
+	return nil
+}
+
 // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
 // - Stop accepting new writes.
 // - Stop memtable flushes before acquiring lock. Because we're acquring lock here

From fbad83a6dcca4213bc58f5054a59f26f6492f31c Mon Sep 17 00:00:00 2001
From: Rohan Prasad <prasad.rohan93@gmail.com>
Date: Fri, 30 Apr 2021 14:00:19 +0530
Subject: [PATCH 2/7] Add test for Drop prefix non blocking

---
 db2_test.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 97 insertions(+)

diff --git a/db2_test.go b/db2_test.go
index d6c86ec6f..4f7c3cae0 100644
--- a/db2_test.go
+++ b/db2_test.go
@@ -31,6 +31,7 @@ import (
 	"regexp"
 	"runtime"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -1055,3 +1056,99 @@ func TestKeyCount(t *testing.T) {
 	require.NoError(t, stream.Orchestrate(context.Background()))
 	require.Equal(t, N, uint64(count))
 }
+
+func TestDropPrefixNonBlocking(t *testing.T) {
+	dir, err := ioutil.TempDir("", "badger-test")
+	require.NoError(t, err)
+	defer removeDir(dir)
+
+	db, err := OpenManaged(DefaultOptions(dir))
+	require.NoError(t, err)
+	defer db.Close()
+
+	val := []byte("value")
+
+	// Insert key-values
+	write := func() {
+		txn := db.NewTransactionAt(1, true)
+		defer txn.Discard()
+		require.NoError(t, txn.Set([]byte("aaa"), val))
+		require.NoError(t, txn.Set([]byte("aab"), val))
+		require.NoError(t, txn.Set([]byte("aba"), val))
+		require.NoError(t, txn.Set([]byte("aca"), val))
+		require.NoError(t, txn.CommitAt(2, nil))
+	}
+
+	read := func() {
+		txn := db.NewTransactionAt(6, false)
+		iterOpts := DefaultIteratorOptions
+		iterOpts.Prefix = []byte("aa")
+		it := txn.NewIterator(iterOpts)
+		defer it.Close()
+
+		cnt := 0
+		for it.Rewind(); it.Valid(); it.Next() {
+			cnt++
+		}
+
+		require.Equal(t, 0, cnt)
+	}
+
+	write()
+	prefixes := [][]byte{[]byte("aa")}
+	require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...))
+	read()
+
+	// Writing again at same timestamp and verifying that vlog rewrites don't allow us to read
+	// these entries anyway.
+	write()
+	read()
+}
+
+func TestDropPrefixNonBlockingNoError(t *testing.T) {
+	dir, err := ioutil.TempDir("", "badger-test")
+	require.NoError(t, err)
+	defer removeDir(dir)
+
+	opt := DefaultOptions(dir)
+	db, err := OpenManaged(opt)
+	require.NoError(t, err)
+	defer db.Close()
+
+	clock := uint64(1)
+
+	writer := func(db *DB, shouldFail bool, closer *z.Closer) {
+		val := []byte("value")
+		defer closer.Done()
+		// Insert key-values
+		for {
+			select {
+			case <-closer.HasBeenClosed():
+				return
+			default:
+				txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true)
+				require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val)))
+
+				err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil)
+				if shouldFail && err != nil {
+					require.Error(t, err, ErrBlockedWrites)
+				} else if !shouldFail {
+					require.NoError(t, err)
+				}
+			}
+		}
+	}
+
+	closer := z.NewCloser(1)
+	go writer(db, true, closer)
+	time.Sleep(time.Millisecond * 100)
+	require.NoError(t, db.DropPrefix([]byte("aa")))
+	closer.SignalAndWait()
+
+	closer2 := z.NewCloser(1)
+	go writer(db, false, closer2)
+	time.Sleep(time.Millisecond * 50)
+	prefixes := [][]byte{[]byte("aa")}
+	require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...))
+	closer2.SignalAndWait()
+}

From 1b0fc7c06579afb18c11a64c0cd6c53f5c4fc721 Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Fri, 30 Apr 2021 18:14:40 +0530
Subject: [PATCH 3/7] fixes

---
 db.go | 52 +++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 41 insertions(+), 11 deletions(-)

diff --git a/db.go b/db.go
index 33f6f34fc..beb6ed197 100644
--- a/db.go
+++ b/db.go
@@ -763,6 +763,8 @@ var requestPool = sync.Pool{
 }
 
 func (db *DB) writeToLSM(b *request) error {
+	db.lock.RLock()
+	defer db.lock.RUnlock()
 	for i, entry := range b.Entries {
 		var err error
 		if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
@@ -1838,11 +1840,11 @@ func (db *DB) dropAll() (func(), error) {
 // not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
 // This operation is useful when we don't want to block writes while we delete the prefixes.
 // It does this in the following way:
-// - Stream the given prefixes at a given timestamp.
-// - Write them to skiplist and handover that skiplist to DB.
-func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error {
+// - Stream the given prefixes at a given ts.
+// - Write them to skiplist at the specified ts and handover that skiplist to DB.
+func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
 	if db.opt.ReadOnly {
-		panic("Attempting to drop data in read-only mode.")
+		return errors.New("Attempting to drop data in read-only mode.")
 	}
 
 	if len(prefixes) == 0 {
@@ -1851,7 +1853,7 @@ func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error {
 	db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)
 
 	dropPrefix := func(prefix []byte) error {
-		stream := db.NewStreamAt(readTs)
+		stream := db.NewStreamAt(ts)
 		stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
 		stream.Prefix = prefix
 		// Use the default implementation with some changes. We don't need anything except key.
@@ -1884,24 +1886,52 @@ func (db *DB) DropPrefixNonBlocking(readTs uint64, prefixes ...[]byte) error {
 			}
 			return list, nil
 		}
+
+		var wg sync.WaitGroup
+		builderMap := make(map[uint32]*skl.Builder)
+		initSize := int64(float64(db.opt.MemTableSize) * 1.1)
+
+		handover := func(force bool) error {
+			for id, b := range builderMap {
+				sl := b.Skiplist()
+				if force || sl.MemSize() > db.opt.MemTableSize {
+					wg.Add(1)
+					if err := db.HandoverSkiplist(sl, wg.Done); err != nil {
+						return err
+					}
+					// Create a fresh builder.
+					builderMap[id] = skl.NewBuilder(initSize)
+				}
+			}
+			return nil
+		}
+
 		stream.Send = func(buf *z.Buffer) error {
-			// Stream framework already batches the key values.
-			b := skl.NewBuilder(1 << 20) // TODO: Maybe figure out the optimal value.
 			err := buf.SliceIterate(func(s []byte) error {
 				var kv pb.KV
 				if err := kv.Unmarshal(s); err != nil {
 					return err
 				}
-				b.Add(y.KeyWithTs(kv.Key, readTs), y.ValueStruct{Meta: bitDelete})
+				if _, ok := builderMap[kv.StreamId]; !ok {
+					builderMap[kv.StreamId] = skl.NewBuilder(initSize)
+				}
+				builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete})
 				return nil
 			})
 			if err != nil {
 				return err
 			}
-			return db.HandoverSkiplist(b.Skiplist(), nil)
+			return handover(false)
 		}
-
-		return stream.Orchestrate(context.Background())
+		if err := stream.Orchestrate(context.Background()); err != nil {
+			return err
+		}
+		// Flush the remaining skiplists if any.
+		if err := handover(true); err != nil {
+			return err
+		}
+		wg.Wait()
+		return nil
 	}
 
 	// Iterate over all the prefixes and logically drop them.

From c63236a2530ea867c86d5dfc3cfec4a463e0c5be Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Mon, 3 May 2021 13:27:57 +0530
Subject: [PATCH 4/7] fix allocated size assertion

---
 table/builder.go | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/table/builder.go b/table/builder.go
index 8322bb86f..55166cd11 100644
--- a/table/builder.go
+++ b/table/builder.go
@@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder {
 	return b
 }
 
+func maxEncodedLen(ctype options.CompressionType, sz int) int {
+	switch ctype {
+	case options.Snappy:
+		return snappy.MaxEncodedLen(sz)
+	case options.ZSTD:
+		return y.ZSTDCompressBound(sz)
+	}
+	return sz
+}
+
 func (b *Builder) handleBlock() {
 	defer b.wg.Done()
 
@@ -175,7 +185,7 @@ func (b *Builder) handleBlock() {
 		// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
 		// than allocated space that means the data from this block cannot be stored in its
 		// existing location.
-		allocatedSpace := (item.end) + padding + 1
+		allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
 		y.AssertTrue(len(blockBuf) <= allocatedSpace)
 
 		// blockBuf was allocated on allocator. So, we don't need to copy it over.

From b14dfefe5a4f8b6d0ddcea5c5da5632d0083152a Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Tue, 4 May 2021 00:28:42 +0530
Subject: [PATCH 5/7] address manish's comments

---
 db.go       | 140 ++++++++++++++++++++++++++++------------------------
 db2_test.go |  11 +++--
 options.go  |  18 +++++++
 3 files changed, 100 insertions(+), 69 deletions(-)

diff --git a/db.go b/db.go
index beb6ed197..c1ecdd87f 100644
--- a/db.go
+++ b/db.go
@@ -1842,7 +1842,7 @@ func (db *DB) dropAll() (func(), error) {
 // It does this in the following way:
 // - Stream the given prefixes at a given ts.
 // - Write them to skiplist at the specified ts and handover that skiplist to DB.
-func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
+func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
 	if db.opt.ReadOnly {
 		return errors.New("Attempting to drop data in read-only mode.")
 	}
@@ -1852,86 +1852,84 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
 	}
 	db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)
 
+	cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking")
+	defer cbuf.Release()
+
+	var wg sync.WaitGroup
+	handover := func(force bool) error {
+		if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize {
+			return nil
+		}
+
+		var kvs []*pb.KV
+		err := cbuf.SliceIterate(func(slice []byte) error {
+			var kv pb.KV
+			if err := kv.Unmarshal(slice); err != nil {
+				return err
+			}
+			kvs = append(kvs, &kv)
+			return nil
+		})
+		if err != nil {
+			return err
+		}
+		cbuf.Reset()
+
+		// TODO: Maybe move the rest of it to a separate go routine, if it is slow.
+		// Sort the kvs, add them to the builder, and hand it over to DB.
+		b := skl.NewBuilder(db.opt.MemTableSize)
+		sort.Slice(kvs, func(i, j int) bool {
+			return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0
+		})
+		for _, kv := range kvs {
+			b.Add(y.KeyWithTs(kv.Key, kv.Version+1), y.ValueStruct{Meta: bitDelete})
+		}
+		wg.Add(1)
+		return db.HandoverSkiplist(b.Skiplist(), wg.Done)
+	}
+
 	dropPrefix := func(prefix []byte) error {
-		stream := db.NewStreamAt(ts)
+		stream := db.NewStreamAt(math.MaxUint64)
 		stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
 		stream.Prefix = prefix
-		// Use the default implementation with some changes. We don't need anything except key.
+		// We don't need anything except key and version.
 		stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
+			if !itr.Valid() {
+				return nil, nil
+			}
+			item := itr.Item()
+			if item.IsDeletedOrExpired() {
+				return nil, nil
+			}
+			if !bytes.Equal(key, item.Key()) {
+				// Return on the encounter with another key.
+				return nil, nil
+			}
+
 			a := itr.Alloc
 			ka := a.Copy(key)
-
 			list := &pb.KVList{}
-			for ; itr.Valid(); itr.Next() {
-				item := itr.Item()
-				if item.IsDeletedOrExpired() {
-					break
-				}
-				if !bytes.Equal(key, item.Key()) {
-					// Break out on the first encounter with another key.
-					break
-				}
-
-				kv := y.NewKV(a)
-				kv.Key = ka
-				list.Kv = append(list.Kv, kv)
-
-				if db.opt.NumVersionsToKeep == 1 {
-					break
-				}
-
-				if item.DiscardEarlierVersions() {
-					break
-				}
-			}
+			// We need to generate only a single delete marker per key. All the versions for this
+			// key will be considered deleted, if we delete the one at highest version.
+			kv := y.NewKV(a)
+			kv.Key = ka
+			kv.Version = item.Version()
+			list.Kv = append(list.Kv, kv)
+			itr.Next()
 			return list, nil
 		}
 
-		var wg sync.WaitGroup
-		builderMap := make(map[uint32]*skl.Builder)
-		initSize := int64(float64(db.opt.MemTableSize) * 1.1)
-
-		handover := func(force bool) error {
-			for id, b := range builderMap {
-				sl := b.Skiplist()
-				if force || sl.MemSize() > db.opt.MemTableSize {
-					wg.Add(1)
-					if err := db.HandoverSkiplist(sl, wg.Done); err != nil {
-						return err
-					}
-					// Create a fresh builder.
-					builderMap[id] = skl.NewBuilder(initSize)
-				}
-			}
-			return nil
-		}
-
 		stream.Send = func(buf *z.Buffer) error {
-			err := buf.SliceIterate(func(s []byte) error {
-				var kv pb.KV
-				if err := kv.Unmarshal(s); err != nil {
-					return err
-				}
-				if _, ok := builderMap[kv.StreamId]; !ok {
-					builderMap[kv.StreamId] = skl.NewBuilder(initSize)
-				}
-				builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete})
-				return nil
-			})
-			if err != nil {
-				return err
-			}
+			sz := buf.LenNoPadding()
+			dst := cbuf.Allocate(sz)
+			y.AssertTrue(sz == copy(dst, buf.Bytes()))
 			return handover(false)
 		}
 		if err := stream.Orchestrate(context.Background()); err != nil {
 			return err
 		}
 		// Flush the remaining skiplists if any.
-		if err := handover(true); err != nil {
-			return err
-		}
-		wg.Wait()
-		return nil
+		return handover(true)
 	}
 
 	// Iterate over all the prefixes and logically drop them.
@@ -1940,9 +1938,21 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
 			return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
 		}
 	}
+
+	wg.Wait()
 	return nil
 }
 
+// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops
+// the prefixes by blocking the writes or doing a logical drop.
+// See DropPrefixBlocking and DropPrefixNonBlocking for more information.
+func (db *DB) DropPrefix(prefixes ...[]byte) error {
+	if db.opt.BlockWritesOnDrop {
+		return db.DropPrefixBlocking(prefixes...)
+	}
+	return db.DropPrefixNonBlocking(prefixes...)
+}
+
 // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
 // - Stop accepting new writes.
 // - Stop memtable flushes before acquiring lock. Because we're acquring lock here
@@ -1954,7 +1964,7 @@ func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
 // - Compact L0->L1, skipping over Kp.
 // - Compact rest of the levels, Li->Li, picking tables which have Kp.
 // - Resume memtable flushes, compactions and writes.
-func (db *DB) DropPrefix(prefixes ...[]byte) error {
+func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error {
 	if len(prefixes) == 0 {
 		return nil
 	}
diff --git a/db2_test.go b/db2_test.go
index 4f7c3cae0..a814a211e 100644
--- a/db2_test.go
+++ b/db2_test.go
@@ -1062,7 +1062,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 	require.NoError(t, err)
 	defer removeDir(dir)
 
-	db, err := OpenManaged(DefaultOptions(dir))
+	db, err := OpenManaged(DefaultOptions(dir).WithBlockWritesOnDrop(false))
 	require.NoError(t, err)
 	defer db.Close()
 
@@ -1081,6 +1081,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 
 	read := func() {
 		txn := db.NewTransactionAt(6, false)
+		defer txn.Discard()
 		iterOpts := DefaultIteratorOptions
 		iterOpts.Prefix = []byte("aa")
 		it := txn.NewIterator(iterOpts)
@@ -1088,6 +1089,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 
 		cnt := 0
 		for it.Rewind(); it.Valid(); it.Next() {
+			fmt.Printf("%+v", it.Item())
 			cnt++
 		}
 
@@ -1096,7 +1098,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 
 	write()
 	prefixes := [][]byte{[]byte("aa")}
-	require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...))
+	require.NoError(t, db.DropPrefix(prefixes...))
 	read()
 
 	// Writing again at same timestamp and verifying that vlog rewrites don't allow us to read
@@ -1135,6 +1137,7 @@ func TestDropPrefixNonBlockingNoError(t *testing.T) {
 				} else if !shouldFail {
 					require.NoError(t, err)
 				}
+				txn.Discard()
 			}
 		}
 	}
@@ -1142,13 +1145,13 @@ func TestDropPrefixNonBlockingNoError(t *testing.T) {
 	closer := z.NewCloser(1)
 	go writer(db, true, closer)
 	time.Sleep(time.Millisecond * 100)
-	require.NoError(t, db.DropPrefix([]byte("aa")))
+	require.NoError(t, db.DropPrefixBlocking([]byte("aa")))
 	closer.SignalAndWait()
 
 	closer2 := z.NewCloser(1)
 	go writer(db, false, closer2)
 	time.Sleep(time.Millisecond * 50)
 	prefixes := [][]byte{[]byte("aa")}
-	require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...))
+	require.NoError(t, db.DropPrefixNonBlocking(prefixes...))
 	closer2.SignalAndWait()
 }
diff --git a/options.go b/options.go
index 3f9ba3395..019e639f4 100644
--- a/options.go
+++ b/options.go
@@ -104,6 +104,9 @@ type Options struct {
 	// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
 	ChecksumVerificationMode options.ChecksumVerificationMode
 
+	// BlockWritesOnDrop determines whether the DropPrefix will be blocking/non-blocking.
+	BlockWritesOnDrop bool
+
 	// DetectConflicts determines whether the transactions would be checked for
 	// conflicts. The transactions can be processed at a higher rate when
 	// conflict detection is disabled.
@@ -140,6 +143,7 @@ func DefaultOptions(path string) Options {
 		MaxLevels:           7,
 		NumGoroutines:       8,
 		MetricsEnabled:      true,
+		BlockWritesOnDrop:   true,
 
 		NumCompactors:           4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
 		NumLevelZeroTables:      5,
@@ -674,6 +678,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
 	return opt
 }
 
+// WithDropMode returns a new Options value with DropMode set to the given value.
+//
+// BlockWritesOnDrop indicates whether the call to DropPrefix should block the writes or not.
+// When set to false, the DropPrefix will do a logical delete and will not block
+// the writes. Although, this will not immediately clear up the LSM tree.
+// When set to false, the DropPrefix will block the writes and will clear up the LSM
+// tree.
+//
+// The default value of BlockWritesOnDrop is true.
+func (opt Options) WithBlockWritesOnDrop(b bool) Options {
+	opt.BlockWritesOnDrop = b
+	return opt
+}
+
 // WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value.
 //
 // This value specifies how much data cache should hold in memory. A small size

From 74e15938c385420df2f3c4bd7b29bf7b5dd7ded7 Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Tue, 4 May 2021 23:26:05 +0530
Subject: [PATCH 6/7] dont use go memory

---
 db.go       | 45 ++++++++++++++++++++++-----------------------
 db2_test.go |  7 +------
 go.mod      |  2 +-
 go.sum      |  6 ++++--
 options.go  | 16 ++++++++--------
 5 files changed, 36 insertions(+), 40 deletions(-)

diff --git a/db.go b/db.go
index c1ecdd87f..e05e22288 100644
--- a/db.go
+++ b/db.go
@@ -1861,29 +1861,20 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
 			return nil
 		}
 
-		var kvs []*pb.KV
-		err := cbuf.SliceIterate(func(slice []byte) error {
-			var kv pb.KV
-			if err := kv.Unmarshal(slice); err != nil {
-				return err
-			}
-			kvs = append(kvs, &kv)
+		// Sort the kvs, add them to the builder, and hand it over to DB.
+		cbuf.SortSlice(func(left, right []byte) bool {
+			return y.CompareKeys(left, right) < 0
+		})
+
+		b := skl.NewBuilder(db.opt.MemTableSize)
+		err := cbuf.SliceIterate(func(s []byte) error {
+			b.Add(s, y.ValueStruct{Meta: bitDelete})
 			return nil
 		})
 		if err != nil {
 			return err
 		}
 		cbuf.Reset()
-
-		// TODO: Maybe move the rest of it to a separate go routine, if it is slow.
-		// Sort the kvs, add them to the builder, and hand it over to DB.
-		b := skl.NewBuilder(db.opt.MemTableSize)
-		sort.Slice(kvs, func(i, j int) bool {
-			return bytes.Compare(kvs[i].Key, kvs[j].Key) < 0
-		})
-		for _, kv := range kvs {
-			b.Add(y.KeyWithTs(kv.Key, kv.Version+1), y.ValueStruct{Meta: bitDelete})
-		}
 		wg.Add(1)
 		return db.HandoverSkiplist(b.Skiplist(), wg.Done)
 	}
@@ -1912,17 +1903,25 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
 			// We need to generate only a single delete marker per key. All the versions for this
 			// key will be considered deleted, if we delete the one at highest version.
 			kv := y.NewKV(a)
-			kv.Key = ka
-			kv.Version = item.Version()
+			kv.Key = y.KeyWithTs(ka, item.Version())
 			list.Kv = append(list.Kv, kv)
 			itr.Next()
 			return list, nil
 		}
 
 		stream.Send = func(buf *z.Buffer) error {
-			sz := buf.LenNoPadding()
-			dst := cbuf.Allocate(sz)
-			y.AssertTrue(sz == copy(dst, buf.Bytes()))
+			kv := pb.KV{}
+			err := buf.SliceIterate(func(s []byte) error {
+				kv.Reset()
+				if err := kv.Unmarshal(s); err != nil {
+					return err
+				}
+				cbuf.WriteSlice(kv.Key)
+				return nil
+			})
+			if err != nil {
+				return err
+			}
 			return handover(false)
 		}
 		if err := stream.Orchestrate(context.Background()); err != nil {
@@ -1947,7 +1946,7 @@ func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
 // the prefixes by blocking the writes or doing a logical drop.
 // See DropPrefixBlocking and DropPrefixNonBlocking for more information.
 func (db *DB) DropPrefix(prefixes ...[]byte) error {
-	if db.opt.BlockWritesOnDrop {
+	if db.opt.AllowStopTheWorld {
 		return db.DropPrefixBlocking(prefixes...)
 	}
 	return db.DropPrefixNonBlocking(prefixes...)
diff --git a/db2_test.go b/db2_test.go
index a814a211e..f92bc0963 100644
--- a/db2_test.go
+++ b/db2_test.go
@@ -1062,7 +1062,7 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 	require.NoError(t, err)
 	defer removeDir(dir)
 
-	db, err := OpenManaged(DefaultOptions(dir).WithBlockWritesOnDrop(false))
+	db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false))
 	require.NoError(t, err)
 	defer db.Close()
 
@@ -1100,11 +1100,6 @@ func TestDropPrefixNonBlocking(t *testing.T) {
 	prefixes := [][]byte{[]byte("aa")}
 	require.NoError(t, db.DropPrefix(prefixes...))
 	read()
-
-	// Writing again at same timestamp and verifying that vlog rewrites don't allow us to read
-	// these entries anyway.
-	write()
-	read()
 }
 
 func TestDropPrefixNonBlockingNoError(t *testing.T) {
diff --git a/go.mod b/go.mod
index d94c4067e..648637012 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@ go 1.12
 require (
 	github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba
 	github.com/cespare/xxhash v1.1.0
-	github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a
+	github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034
 	github.com/dustin/go-humanize v1.0.0
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.3.1
diff --git a/go.sum b/go.sum
index f14b4a123..fea436500 100644
--- a/go.sum
+++ b/go.sum
@@ -7,6 +7,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
 github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
@@ -15,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg=
-github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 h1:WPhpbABd68W7GVsDEH2TlQOfd/2PQs9pczxp12oUiIw=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
diff --git a/options.go b/options.go
index 019e639f4..0f6c6def7 100644
--- a/options.go
+++ b/options.go
@@ -104,8 +104,8 @@ type Options struct {
 	// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
 	ChecksumVerificationMode options.ChecksumVerificationMode
 
-	// BlockWritesOnDrop determines whether the DropPrefix will be blocking/non-blocking.
-	BlockWritesOnDrop bool
+	// AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking.
+	AllowStopTheWorld bool
 
 	// DetectConflicts determines whether the transactions would be checked for
 	// conflicts. The transactions can be processed at a higher rate when
@@ -143,7 +143,7 @@ func DefaultOptions(path string) Options {
 		MaxLevels:           7,
 		NumGoroutines:       8,
 		MetricsEnabled:      true,
-		BlockWritesOnDrop:   true,
+		AllowStopTheWorld:   true,
 
 		NumCompactors:           4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
 		NumLevelZeroTables:      5,
@@ -678,17 +678,17 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
 	return opt
 }
 
-// WithDropMode returns a new Options value with DropMode set to the given value.
+// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value.
 //
-// BlockWritesOnDrop indicates whether the call to DropPrefix should block the writes or not.
+// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not.
 // When set to false, the DropPrefix will do a logical delete and will not block
 // the writes. Although, this will not immediately clear up the LSM tree.
 // When set to false, the DropPrefix will block the writes and will clear up the LSM
 // tree.
 //
-// The default value of BlockWritesOnDrop is true.
-func (opt Options) WithBlockWritesOnDrop(b bool) Options {
-	opt.BlockWritesOnDrop = b
+// The default value of AllowStopTheWorld is true.
+func (opt Options) WithAllowStopTheWorld(b bool) Options {
+	opt.AllowStopTheWorld = b
 	return opt
 }
 

From 54cc168cac2a67404c39f6845a8c8b48e4d09eb8 Mon Sep 17 00:00:00 2001
From: NamanJain8 <jnaman806@gmail.com>
Date: Wed, 5 May 2021 00:44:37 +0530
Subject: [PATCH 7/7] update ristretto

---
 go.mod | 2 +-
 go.sum | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/go.mod b/go.mod
index 648637012..0e2a361e8 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@ go 1.12
 require (
 	github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba
 	github.com/cespare/xxhash v1.1.0
-	github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034
+	github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3
 	github.com/dustin/go-humanize v1.0.0
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.3.1
diff --git a/go.sum b/go.sum
index fea436500..e97ba7656 100644
--- a/go.sum
+++ b/go.sum
@@ -17,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034 h1:WPhpbABd68W7GVsDEH2TlQOfd/2PQs9pczxp12oUiIw=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504175135-20a958a7e034/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=