From 16920d6cc753dfdab5b98c137a53cbc17dbba4b4 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Wed, 21 Apr 2021 00:16:27 -0700
Subject: [PATCH 01/10] Accompanying code to Dgraph's mrjn/optimize-commits

---
 db.go              | 100 ++++++++++++++++++++++++++++++++++++---------
 db_test.go         |   2 +-
 managed_db_test.go |  38 +++++++++++++++++
 options.go         |   2 +-
 skl/skl.go         |   3 ++
 txn.go             |  44 ++++++++++++++++++--
 y/y.go             |   5 +++
 7 files changed, 169 insertions(+), 25 deletions(-)

diff --git a/db.go b/db.go
index 4e82443b2..13a730951 100644
--- a/db.go
+++ b/db.go
@@ -763,16 +763,9 @@ var requestPool = sync.Pool{
 }
 
 func (db *DB) writeToLSM(b *request) error {
-	// We should check the length of b.Prts and b.Entries only when badger is not
-	// running in InMemory mode. In InMemory mode, we don't write anything to the
-	// value log and that's why the length of b.Ptrs will always be zero.
-	if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
-		return errors.Errorf("Ptrs and Entries don't match: %+v", b)
-	}
-
 	for i, entry := range b.Entries {
 		var err error
-		if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
+		if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
 			// Will include deletion / tombstone case.
 			err = db.mt.Put(entry.Key,
 				y.ValueStruct{
@@ -818,10 +811,13 @@ func (db *DB) writeRequests(reqs []*request) error {
 		}
 	}
 	db.opt.Debugf("writeRequests called. Writing to value log")
-	err := db.vlog.write(reqs)
-	if err != nil {
-		done(err)
-		return err
+	if !db.opt.managedTxns {
+		// Don't do value log writes in managed mode.
+		err := db.vlog.write(reqs)
+		if err != nil {
+			done(err)
+			return err
+		}
 	}
 
 	db.opt.Debugf("Sending updates to subscribers")
@@ -834,6 +830,7 @@ func (db *DB) writeRequests(reqs []*request) error {
 		}
 		count += len(b.Entries)
 		var i uint64
+		var err error
 		for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
 			i++
 			if i%100 == 0 {
@@ -1010,16 +1007,40 @@ func (db *DB) ensureRoomForWrite() error {
 	}
 }
 
+func (db *DB) HandoverSkiplist(skl *skl.Skiplist) error {
+	db.lock.Lock()
+	defer db.lock.Unlock()
+
+	mt := &memTable{sl: skl}
+	select {
+	case db.flushChan <- flushTask{mt: mt}:
+		db.imm = append(db.imm, mt)
+		return nil
+	default:
+		return errNoRoom
+	}
+}
+
 func arenaSize(opt Options) int64 {
 	return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
 }
 
+func (db *DB) NewSkiplist() *skl.Skiplist {
+	return skl.NewSkiplist(arenaSize(db.opt))
+}
+
 // buildL0Table builds a new table from the memtable.
 func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
-	iter := ft.mt.sl.NewIterator()
+	var iter y.Iterator
+	if ft.itr != nil {
+		iter = ft.itr
+	} else {
+		iter = ft.mt.sl.NewUniIterator(false)
+	}
 	defer iter.Close()
+
 	b := table.NewTableBuilder(bopts)
-	for iter.SeekToFirst(); iter.Valid(); iter.Next() {
+	for iter.Rewind(); iter.Valid(); iter.Next() {
 		if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
 			continue
 		}
@@ -1035,15 +1056,16 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
 
 type flushTask struct {
 	mt           *memTable
+	itr          y.Iterator
 	dropPrefixes [][]byte
 }
 
 // handleFlushTask must be run serially.
 func (db *DB) handleFlushTask(ft flushTask) error {
 	// There can be a scenario, when empty memtable is flushed.
-	if ft.mt.sl.Empty() {
-		return nil
-	}
+	// if ft.mt.sl.Empty() {
+	// 	return nil
+	// }
 
 	bopts := buildTableOptions(db)
 	builder := buildL0Table(ft, bopts)
@@ -1080,11 +1102,45 @@ func (db *DB) handleFlushTask(ft flushTask) error {
 func (db *DB) flushMemtable(lc *z.Closer) error {
 	defer lc.Done()
 
+	var sz int64
+	var itrs []y.Iterator
+	var mts []*memTable
+	slurp := func() {
+		for {
+			select {
+			case more := <-db.flushChan:
+				sl := more.mt.sl
+				itrs = append(itrs, sl.NewUniIterator(false))
+				mts = append(mts, more.mt)
+
+				sz += sl.MemSize()
+				if sz > db.opt.MemTableSize {
+					db.opt.Infof("sz: %d > memtable size: %d\n", sz, db.opt.MemTableSize)
+					return
+				}
+			default:
+				return
+			}
+		}
+	}
+
 	for ft := range db.flushChan {
 		if ft.mt == nil {
 			// We close db.flushChan now, instead of sending a nil ft.mt.
 			continue
 		}
+		sz = ft.mt.sl.MemSize()
+		itrs = itrs[:0]
+		itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
+		mts = append(mts, ft.mt)
+
+		// Pick more memtables, so we can really fill up the L0 table.
+		slurp()
+
+		db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
+		ft.mt = nil
+		ft.itr = table.NewMergeIterator(itrs, false)
+
 		for {
 			err := db.handleFlushTask(ft)
 			if err == nil {
@@ -1095,9 +1151,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 				// which would arrive here would match db.imm[0], because we acquire a
 				// lock over DB when pushing to flushChan.
 				// TODO: This logic is dirty AF. Any change and this could easily break.
-				y.AssertTrue(ft.mt == db.imm[0])
-				db.imm = db.imm[1:]
-				ft.mt.DecrRef() // Return memory.
+				for _, mt := range mts {
+					y.AssertTrue(mt == db.imm[0])
+					db.imm = db.imm[1:]
+					mt.DecrRef() // Return memory.
+				}
 				db.lock.Unlock()
 
 				break
@@ -1106,6 +1164,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 			db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
 			time.Sleep(time.Second)
 		}
+		// Reset everything.
+		itrs, mts, sz = itrs[:0], mts[:0], 0
 	}
 	return nil
 }
diff --git a/db_test.go b/db_test.go
index 079a93974..0423a0b31 100644
--- a/db_test.go
+++ b/db_test.go
@@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
 			y.Check2(rand.Read(value))
 			st := 0
 
-			buf := z.NewBuffer(10 << 20, "test")
+			buf := z.NewBuffer(10<<20, "test")
 			defer buf.Release()
 			for i := 0; i < 1000; i++ {
 				key := make([]byte, 8)
diff --git a/managed_db_test.go b/managed_db_test.go
index c5112a091..bc919712f 100644
--- a/managed_db_test.go
+++ b/managed_db_test.go
@@ -771,6 +771,44 @@ func TestWriteBatchDuplicate(t *testing.T) {
 	})
 }
 
+func TestWriteViaSkip(t *testing.T) {
+	key := func(i int) []byte {
+		return []byte(fmt.Sprintf("%10d", i))
+	}
+	val := func(i int) []byte {
+		return []byte(fmt.Sprintf("%128d", i))
+	}
+	opt := DefaultOptions("")
+	opt.managedTxns = true
+	runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
+		s := db.NewSkiplist()
+		for i := 0; i < 100; i++ {
+			s.Put(y.KeyWithTs(key(i), uint64(i+1)), y.ValueStruct{Value: val(i)})
+		}
+
+		// Hand over skiplist to Badger.
+		require.NoError(t, db.HandoverSkiplist(s))
+
+		// Read the data back.
+		txn := db.NewTransactionAt(100, false)
+		defer txn.Discard()
+		itr := txn.NewIterator(DefaultIteratorOptions)
+		defer itr.Close()
+
+		i := 0
+		for itr.Rewind(); itr.Valid(); itr.Next() {
+			item := itr.Item()
+			require.Equal(t, string(key(i)), string(item.Key()))
+			require.Equal(t, item.Version(), uint64(i+1))
+			valcopy, err := item.ValueCopy(nil)
+			require.NoError(t, err)
+			require.Equal(t, val(i), valcopy)
+			i++
+		}
+		require.Equal(t, 100, i)
+	})
+}
+
 func TestZeroDiscardStats(t *testing.T) {
 	N := uint64(10000)
 	populate := func(t *testing.T, db *DB) {
diff --git a/options.go b/options.go
index 880ac5e89..3f9ba3395 100644
--- a/options.go
+++ b/options.go
@@ -144,7 +144,7 @@ func DefaultOptions(path string) Options {
 		NumCompactors:           4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
 		NumLevelZeroTables:      5,
 		NumLevelZeroTablesStall: 15,
-		NumMemtables:            5,
+		NumMemtables:            15,
 		BloomFalsePositive:      0.01,
 		BlockSize:               4 * 1024,
 		SyncWrites:              false,
diff --git a/skl/skl.go b/skl/skl.go
index a39d34921..d0b9c7a19 100644
--- a/skl/skl.go
+++ b/skl/skl.go
@@ -282,6 +282,9 @@ func (s *Skiplist) getHeight() int32 {
 	return atomic.LoadInt32(&s.height)
 }
 
+func (s *Skiplist) PutSorted(key []byte, v y.ValueStruct) {
+}
+
 // Put inserts the key-value pair.
 func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
 	// Since we allow overwrite, we may not need to create a new node. We might not even need to
diff --git a/txn.go b/txn.go
index 519906a3c..242850182 100644
--- a/txn.go
+++ b/txn.go
@@ -26,6 +26,7 @@ import (
 	"sync"
 	"sync/atomic"
 
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 	"github.com/dgraph-io/ristretto/z"
 	"github.com/pkg/errors"
@@ -358,9 +359,30 @@ func exceedsSize(prefix string, max int64, key []byte) error {
 		prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
 }
 
-func (txn *Txn) modify(e *Entry) error {
-	const maxKeySize = 65000
+const maxKeySize = 65000
+const maxValSize = 1 << 20
 
+func ValidEntry(db *DB, key, val []byte) error {
+	switch {
+	case len(key) == 0:
+		return ErrEmptyKey
+	case bytes.HasPrefix(key, badgerPrefix):
+		return ErrInvalidKey
+	case len(key) > maxKeySize:
+		// Key length can't be more than uint16, as determined by table::header.  To
+		// keep things safe and allow badger move prefix and a timestamp suffix, let's
+		// cut it down to 65000, instead of using 65536.
+		return exceedsSize("Key", maxKeySize, key)
+	case int64(len(val)) > maxValSize:
+		return exceedsSize("Value", maxValSize, val)
+	}
+	if err := db.isBanned(key); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (txn *Txn) modify(e *Entry) error {
 	switch {
 	case !txn.update:
 		return ErrReadOnlyTxn
@@ -384,7 +406,6 @@ func (txn *Txn) modify(e *Entry) error {
 	if err := txn.db.isBanned(e.Key); err != nil {
 		return err
 	}
-
 	if err := txn.checkSize(e); err != nil {
 		return err
 	}
@@ -742,6 +763,23 @@ func (txn *Txn) ReadTs() uint64 {
 	return txn.readTs
 }
 
+func (txn *Txn) ToSkipList(s *skl.Skiplist, commitTs uint64) {
+	for _, e := range txn.pendingWrites {
+		e.Key = y.KeyWithTs(e.Key, commitTs)
+		s.Put(e.Key,
+			y.ValueStruct{
+				Value: e.Value,
+				// Ensure value pointer flag is removed. Otherwise, the value will fail
+				// to be retrieved during iterator prefetch. `bitValuePointer` is only
+				// known to be set in write to LSM when the entry is loaded from a backup
+				// with lower ValueThreshold and its value was stored in the value log.
+				Meta:      e.meta &^ bitValuePointer,
+				UserMeta:  e.UserMeta,
+				ExpiresAt: e.ExpiresAt,
+			})
+	}
+}
+
 // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
 // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
 // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
diff --git a/y/y.go b/y/y.go
index 38217fb4e..22c8035f9 100644
--- a/y/y.go
+++ b/y/y.go
@@ -121,6 +121,11 @@ func Copy(a []byte) []byte {
 	return b
 }
 
+func SetKeyTs(key []byte, ts uint64) {
+	start := len(key) - 8
+	binary.BigEndian.PutUint64(key[start:], math.MaxUint64-ts)
+}
+
 // KeyWithTs generates a new key by appending ts to key.
 func KeyWithTs(key []byte, ts uint64) []byte {
 	out := make([]byte, len(key)+8)

From b7b640094aac0632fa8e06be0fff68758013b68e Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Wed, 21 Apr 2021 12:17:19 -0700
Subject: [PATCH 02/10] Expose Skiplist

---
 skl/skl.go | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/skl/skl.go b/skl/skl.go
index 4dc791c19..fd700e7de 100644
--- a/skl/skl.go
+++ b/skl/skl.go
@@ -569,3 +569,7 @@ func (b *Builder) Add(k []byte, v y.ValueStruct) {
 		b.prev[i] = x
 	}
 }
+
+func (b *Builder) Skiplist() *Skiplist {
+	return b.s
+}

From 11fb1ecc86c16d3ab7df1ffb03e0e0ecba40088c Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 11:26:19 -0700
Subject: [PATCH 03/10] Comment out Badger log

---
 db.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/db.go b/db.go
index 13a730951..bd0cceddf 100644
--- a/db.go
+++ b/db.go
@@ -1137,7 +1137,7 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 		// Pick more memtables, so we can really fill up the L0 table.
 		slurp()
 
-		db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
+		// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
 		ft.mt = nil
 		ft.itr = table.NewMergeIterator(itrs, false)
 

From 5e5b3668c107019a5d83e774720a9f4892abdff5 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 11:51:08 -0700
Subject: [PATCH 04/10] Introduce callbacks for HandoverSkiplist

---
 db.go | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/db.go b/db.go
index bd0cceddf..f508a867c 100644
--- a/db.go
+++ b/db.go
@@ -1007,13 +1007,13 @@ func (db *DB) ensureRoomForWrite() error {
 	}
 }
 
-func (db *DB) HandoverSkiplist(skl *skl.Skiplist) error {
+func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
 	db.lock.Lock()
 	defer db.lock.Unlock()
 
 	mt := &memTable{sl: skl}
 	select {
-	case db.flushChan <- flushTask{mt: mt}:
+	case db.flushChan <- flushTask{mt: mt, cb: callback}:
 		db.imm = append(db.imm, mt)
 		return nil
 	default:
@@ -1056,6 +1056,7 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
 
 type flushTask struct {
 	mt           *memTable
+	cb           func()
 	itr          y.Iterator
 	dropPrefixes [][]byte
 }
@@ -1105,6 +1106,7 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 	var sz int64
 	var itrs []y.Iterator
 	var mts []*memTable
+	var cbs []func()
 	slurp := func() {
 		for {
 			select {
@@ -1112,6 +1114,7 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 				sl := more.mt.sl
 				itrs = append(itrs, sl.NewUniIterator(false))
 				mts = append(mts, more.mt)
+				cbs = append(cbs, more.cb)
 
 				sz += sl.MemSize()
 				if sz > db.opt.MemTableSize {
@@ -1130,9 +1133,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 			continue
 		}
 		sz = ft.mt.sl.MemSize()
-		itrs = itrs[:0]
+		// Reset of itrs, mts etc. is being done below.
+		y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
 		itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
 		mts = append(mts, ft.mt)
+		cbs = append(cbs, ft.cb)
 
 		// Pick more memtables, so we can really fill up the L0 table.
 		slurp()
@@ -1140,6 +1145,7 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 		// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
 		ft.mt = nil
 		ft.itr = table.NewMergeIterator(itrs, false)
+		ft.cb = nil
 
 		for {
 			err := db.handleFlushTask(ft)
@@ -1158,6 +1164,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 				}
 				db.lock.Unlock()
 
+				for _, cb := range cbs {
+					if cb != nil {
+						cb()
+					}
+				}
 				break
 			}
 			// Encountered error. Retry indefinitely.
@@ -1165,7 +1176,7 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 			time.Sleep(time.Second)
 		}
 		// Reset everything.
-		itrs, mts, sz = itrs[:0], mts[:0], 0
+		itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
 	}
 	return nil
 }

From 590924854539203cadb95f0774944c03d1b7b251 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 14:15:13 -0700
Subject: [PATCH 05/10] Modify test and remove code.

---
 managed_db_test.go | 18 ++++++++++++++----
 skl/skl.go         |  3 ---
 txn.go             | 18 ------------------
 3 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/managed_db_test.go b/managed_db_test.go
index bc919712f..844ef1b02 100644
--- a/managed_db_test.go
+++ b/managed_db_test.go
@@ -783,14 +783,24 @@ func TestWriteViaSkip(t *testing.T) {
 	runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
 		s := db.NewSkiplist()
 		for i := 0; i < 100; i++ {
-			s.Put(y.KeyWithTs(key(i), uint64(i+1)), y.ValueStruct{Value: val(i)})
+			s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)})
+		}
+		{
+			// Update key timestamps by directly changing them in the skiplist.
+			itr := s.NewUniIterator(false)
+			defer itr.Close()
+			itr.Rewind()
+			for itr.Valid() {
+				y.SetKeyTs(itr.Key(), 101)
+				itr.Next()
+			}
 		}
 
 		// Hand over skiplist to Badger.
-		require.NoError(t, db.HandoverSkiplist(s))
+		require.NoError(t, db.HandoverSkiplist(s, nil))
 
 		// Read the data back.
-		txn := db.NewTransactionAt(100, false)
+		txn := db.NewTransactionAt(101, false)
 		defer txn.Discard()
 		itr := txn.NewIterator(DefaultIteratorOptions)
 		defer itr.Close()
@@ -799,7 +809,7 @@ func TestWriteViaSkip(t *testing.T) {
 		for itr.Rewind(); itr.Valid(); itr.Next() {
 			item := itr.Item()
 			require.Equal(t, string(key(i)), string(item.Key()))
-			require.Equal(t, item.Version(), uint64(i+1))
+			require.Equal(t, item.Version(), uint64(101))
 			valcopy, err := item.ValueCopy(nil)
 			require.NoError(t, err)
 			require.Equal(t, val(i), valcopy)
diff --git a/skl/skl.go b/skl/skl.go
index d51ed3c9b..64ee536d3 100644
--- a/skl/skl.go
+++ b/skl/skl.go
@@ -296,9 +296,6 @@ func (s *Skiplist) getHeight() int32 {
 	return atomic.LoadInt32(&s.height)
 }
 
-func (s *Skiplist) PutSorted(key []byte, v y.ValueStruct) {
-}
-
 // Put inserts the key-value pair.
 func (s *Skiplist) Put(key []byte, v y.ValueStruct) {
 	// Since we allow overwrite, we may not need to create a new node. We might not even need to
diff --git a/txn.go b/txn.go
index 242850182..c0cb20d67 100644
--- a/txn.go
+++ b/txn.go
@@ -26,7 +26,6 @@ import (
 	"sync"
 	"sync/atomic"
 
-	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 	"github.com/dgraph-io/ristretto/z"
 	"github.com/pkg/errors"
@@ -763,23 +762,6 @@ func (txn *Txn) ReadTs() uint64 {
 	return txn.readTs
 }
 
-func (txn *Txn) ToSkipList(s *skl.Skiplist, commitTs uint64) {
-	for _, e := range txn.pendingWrites {
-		e.Key = y.KeyWithTs(e.Key, commitTs)
-		s.Put(e.Key,
-			y.ValueStruct{
-				Value: e.Value,
-				// Ensure value pointer flag is removed. Otherwise, the value will fail
-				// to be retrieved during iterator prefetch. `bitValuePointer` is only
-				// known to be set in write to LSM when the entry is loaded from a backup
-				// with lower ValueThreshold and its value was stored in the value log.
-				Meta:      e.meta &^ bitValuePointer,
-				UserMeta:  e.UserMeta,
-				ExpiresAt: e.ExpiresAt,
-			})
-	}
-}
-
 // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
 // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
 // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by

From 49b0be72de55d1965e709b4f5b2074d8ec1bfa2b Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 14:22:31 -0700
Subject: [PATCH 06/10] Self review

---
 db.go | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)

diff --git a/db.go b/db.go
index f508a867c..8326ae2e0 100644
--- a/db.go
+++ b/db.go
@@ -1063,11 +1063,7 @@ type flushTask struct {
 
 // handleFlushTask must be run serially.
 func (db *DB) handleFlushTask(ft flushTask) error {
-	// There can be a scenario, when empty memtable is flushed.
-	// if ft.mt.sl.Empty() {
-	// 	return nil
-	// }
-
+	// ft.mt could be nil with ft.itr being the valid field.
 	bopts := buildTableOptions(db)
 	builder := buildL0Table(ft, bopts)
 	defer builder.Close()
@@ -1118,7 +1114,6 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 
 				sz += sl.MemSize()
 				if sz > db.opt.MemTableSize {
-					db.opt.Infof("sz: %d > memtable size: %d\n", sz, db.opt.MemTableSize)
 					return
 				}
 			default:

From 45540cedb11c184abc9ffff8283552caf1b7cca9 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 14:45:08 -0700
Subject: [PATCH 07/10] Fix up

---
 db.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/db.go b/db.go
index 8326ae2e0..1c9597066 100644
--- a/db.go
+++ b/db.go
@@ -1107,6 +1107,9 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
 		for {
 			select {
 			case more := <-db.flushChan:
+				if more.mt == nil {
+					return
+				}
 				sl := more.mt.sl
 				itrs = append(itrs, sl.NewUniIterator(false))
 				mts = append(mts, more.mt)

From 43dc39a487846619c341a87941715a9066ccf061 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 15:40:33 -0700
Subject: [PATCH 08/10] Skip a test not relevant anymore.

---
 value_test.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/value_test.go b/value_test.go
index a2e95dd66..093d03cee 100644
--- a/value_test.go
+++ b/value_test.go
@@ -136,6 +136,8 @@ func TestValueBasic(t *testing.T) {
 }
 
 func TestValueGCManaged(t *testing.T) {
+	t.Skipf("Value Log is not used in managed mode.")
+
 	dir, err := ioutil.TempDir("", "badger-test")
 	require.NoError(t, err)
 	defer removeDir(dir)

From 52a9a68e6310f9ae4547b9a8d6f9e20e255f5f92 Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 21:10:45 -0700
Subject: [PATCH 09/10] Expose a bit

---
 backup_test.go |  2 +-
 db.go          | 18 ++++++++++++++++++
 iterator.go    |  2 +-
 levels.go      |  4 ++--
 levels_test.go | 34 +++++++++++++++++-----------------
 merge.go       |  2 +-
 structs.go     |  2 +-
 value.go       |  2 +-
 8 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/backup_test.go b/backup_test.go
index 310d46fa4..6928e5407 100644
--- a/backup_test.go
+++ b/backup_test.go
@@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
 				if err := txn.SetEntry(entry); err != nil {
 					return err
 				}
-				updates[i] = bitDiscardEarlierVersions
+				updates[i] = BitDiscardEarlierVersions
 			}
 			return nil
 		})
diff --git a/db.go b/db.go
index 1c9597066..18ce669fa 100644
--- a/db.go
+++ b/db.go
@@ -1011,6 +1011,24 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
 	db.lock.Lock()
 	defer db.lock.Unlock()
 
+	// If we have some data in db.mt, we should push that first, so the ordering of writes is
+	// maintained.
+	if !db.mt.sl.Empty() {
+		sz := db.mt.sl.MemSize()
+		db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
+		var err error
+		select {
+		case db.flushChan <- flushTask{mt: db.mt}:
+			db.imm = append(db.imm, db.mt)
+			db.mt, err = db.newMemTable()
+			if err != nil {
+				return y.Wrapf(err, "cannot push current memtable")
+			}
+		default:
+			return errNoRoom
+		}
+	}
+
 	mt := &memTable{sl: skl}
 	select {
 	case db.flushChan <- flushTask{mt: mt, cb: callback}:
diff --git a/iterator.go b/iterator.go
index c84392085..dd3cb022e 100644
--- a/iterator.go
+++ b/iterator.go
@@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
 // DiscardEarlierVersions returns whether the item was created with the
 // option to discard earlier versions of a key when multiple are available.
 func (item *Item) DiscardEarlierVersions() bool {
-	return item.meta&bitDiscardEarlierVersions > 0
+	return item.meta&BitDiscardEarlierVersions > 0
 }
 
 func (item *Item) yieldItemValue() ([]byte, func(), error) {
diff --git a/levels.go b/levels.go
index 9fa761e63..22bcfab4e 100644
--- a/levels.go
+++ b/levels.go
@@ -716,7 +716,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
 				}
 				lastKey = y.SafeCopy(lastKey, it.Key())
 				numVersions = 0
-				firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
+				firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0
 
 				if len(tableKr.left) == 0 {
 					tableKr.left = y.SafeCopy(tableKr.left, it.Key())
@@ -753,7 +753,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
 				// - The `discardEarlierVersions` bit is set OR
 				// - We've already processed `NumVersionsToKeep` number of versions
 				// (including the current item being processed)
-				lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
+				lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
 					numVersions == s.kv.opt.NumVersionsToKeep
 
 				if isExpired || lastValidVersion {
diff --git a/levels_test.go b/levels_test.go
index 56dafa8c8..fed89ef5f 100644
--- a/levels_test.go
+++ b/levels_test.go
@@ -707,11 +707,11 @@ func TestDiscardFirstVersion(t *testing.T) {
 
 	runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
 		l0 := []keyValVersion{{"foo", "bar", 1, 0}}
-		l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
+		l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
 		l02 := []keyValVersion{{"foo", "bar", 3, 0}}
 		l03 := []keyValVersion{{"foo", "bar", 4, 0}}
 		l04 := []keyValVersion{{"foo", "bar", 9, 0}}
-		l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
+		l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}
 
 		// Level 0 has all the tables.
 		createAndOpen(db, l0, 0)
@@ -742,11 +742,11 @@ func TestDiscardFirstVersion(t *testing.T) {
 		// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
 		//   marker so IT WILL BE REMOVED.
 		ExpectedKeys := []keyValVersion{
-			{"foo", "bar", 10, bitDiscardEarlierVersions},
+			{"foo", "bar", 10, BitDiscardEarlierVersions},
 			{"foo", "bar", 9, 0},
 			{"foo", "bar", 4, 0},
 			{"foo", "bar", 3, 0},
-			{"foo", "bar", 2, bitDiscardEarlierVersions}}
+			{"foo", "bar", 2, BitDiscardEarlierVersions}}
 
 		getAllAndCheck(t, db, ExpectedKeys)
 	})
@@ -1060,15 +1060,15 @@ func TestSameLevel(t *testing.T) {
 	opt.LmaxCompaction = true
 	runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
 		l6 := []keyValVersion{
-			{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
+			{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
 			{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
 		}
 		l61 := []keyValVersion{
-			{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
+			{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
 			{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
 		}
 		l62 := []keyValVersion{
-			{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
+			{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
 			{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
 		}
 		createAndOpen(db, l6, 6)
@@ -1077,11 +1077,11 @@ func TestSameLevel(t *testing.T) {
 		require.NoError(t, db.lc.validate())
 
 		getAllAndCheck(t, db, []keyValVersion{
-			{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
+			{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
 			{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
-			{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
+			{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
 			{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
-			{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
+			{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
 			{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
 		})
 
@@ -1097,11 +1097,11 @@ func TestSameLevel(t *testing.T) {
 		db.SetDiscardTs(3)
 		require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
 		getAllAndCheck(t, db, []keyValVersion{
-			{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
+			{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
 			{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
-			{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
+			{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
 			{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
-			{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
+			{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
 			{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
 		})
 
@@ -1118,9 +1118,9 @@ func TestSameLevel(t *testing.T) {
 		cdef.t.baseLevel = 1
 		require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
 		getAllAndCheck(t, db, []keyValVersion{
-			{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
-			{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
-			{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
+			{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
+			{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
+			{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
 		require.NoError(t, db.lc.validate())
 	})
 }
@@ -1186,7 +1186,7 @@ func TestStaleDataCleanup(t *testing.T) {
 			for i := count; i > 0; i-- {
 				var meta byte
 				if i == 0 {
-					meta = bitDiscardEarlierVersions
+					meta = BitDiscardEarlierVersions
 				}
 				b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
 			}
diff --git a/merge.go b/merge.go
index ac1a2b51e..0e63dde15 100644
--- a/merge.go
+++ b/merge.go
@@ -116,7 +116,7 @@ func (op *MergeOperator) compact() error {
 		{
 			Key:   y.KeyWithTs(op.key, version),
 			Value: val,
-			meta:  bitDiscardEarlierVersions,
+			meta:  BitDiscardEarlierVersions,
 		},
 	}
 	// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
diff --git a/structs.go b/structs.go
index c17f818cf..dc5865b3e 100644
--- a/structs.go
+++ b/structs.go
@@ -206,7 +206,7 @@ func (e *Entry) WithMeta(meta byte) *Entry {
 // have a higher setting for NumVersionsToKeep (in Dgraph, we set it to infinity), you can use this
 // method to indicate that all the older versions can be discarded and removed during compactions.
 func (e *Entry) WithDiscard() *Entry {
-	e.meta = bitDiscardEarlierVersions
+	e.meta = BitDiscardEarlierVersions
 	return e
 }
 
diff --git a/value.go b/value.go
index 6e8f9178e..8cef20935 100644
--- a/value.go
+++ b/value.go
@@ -47,7 +47,7 @@ var maxVlogFileSize uint32 = math.MaxUint32
 const (
 	bitDelete                 byte = 1 << 0 // Set if the key has been deleted.
 	bitValuePointer           byte = 1 << 1 // Set if the value is NOT stored directly next to key.
-	bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
+	BitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
 	// Set if item shouldn't be discarded via compactions (used by merge operator)
 	bitMergeEntry byte = 1 << 3
 	// The MSB 2 bits are for transactions.

From 5169f7f212fd530b32c610c3ba8a05774f62cf0a Mon Sep 17 00:00:00 2001
From: Manish R Jain <manish@dgraph.io>
Date: Mon, 26 Apr 2021 21:34:00 -0700
Subject: [PATCH 10/10] Add a panic for usage

---
 db.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/db.go b/db.go
index 18ce669fa..908fdb08f 100644
--- a/db.go
+++ b/db.go
@@ -1008,6 +1008,9 @@ func (db *DB) ensureRoomForWrite() error {
 }
 
 func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
+	if !db.opt.managedTxns {
+		panic("Handover Skiplist is only available in managed mode.")
+	}
 	db.lock.Lock()
 	defer db.lock.Unlock()