Skip to content

Commit

Permalink
Revert "feat(Skiplist): Introduce a way to hand over skiplists to Bad…
Browse files Browse the repository at this point in the history
…ger (#1696)"

This reverts commit b21f591.
  • Loading branch information
joshua-goldstein committed Jan 5, 2023
1 parent 2b93727 commit ff5c908
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 216 deletions.
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
if err := txn.SetEntry(entry); err != nil {
return err
}
updates[i] = BitDiscardEarlierVersions
updates[i] = bitDiscardEarlierVersions
}
return nil
})
Expand Down
126 changes: 18 additions & 108 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,11 +758,16 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
db.lock.RLock()
defer db.lock.RUnlock()
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
// value log and that's why the length of b.Ptrs will always be zero.
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}

for i, entry := range b.Entries {
var err error
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -824,7 +829,6 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -1003,62 +1007,16 @@ func (db *DB) ensureRoomForWrite() error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}
db.lock.Lock()
defer db.lock.Unlock()

// If we have some data in db.mt, we should push that first, so the ordering of writes is
// maintained.
if !db.mt.sl.Empty() {
sz := db.mt.sl.MemSize()
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
var err error
select {
case db.flushChan <- flushTask{mt: db.mt}:
db.imm = append(db.imm, db.mt)
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot push current memtable")
}
default:
return errNoRoom
}
}

mt := &memTable{sl: skl}

select {
case db.flushChan <- flushTask{mt: mt, cb: callback}:
db.imm = append(db.imm, mt)
return nil
default:
return errNoRoom
}
}

func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}

func (db *DB) NewSkiplist() *skl.Skiplist {
return skl.NewSkiplist(arenaSize(db.opt))
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
var iter y.Iterator
if ft.itr != nil {
iter = ft.itr
} else {
iter = ft.mt.sl.NewUniIterator(false)
}
iter := ft.mt.sl.NewIterator()
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.Rewind(); iter.Valid(); iter.Next() {
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
Expand All @@ -1074,14 +1032,16 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {

type flushTask struct {
mt *memTable
cb func()
itr y.Iterator
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// ft.mt could be nil with ft.itr being the valid field.
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()
Expand Down Expand Up @@ -1117,52 +1077,11 @@ func (db *DB) handleFlushTask(ft flushTask) error {
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

var sz int64
var itrs []y.Iterator
var mts []*memTable
var cbs []func()
slurp := func() {
for {
select {
case more := <-db.flushChan:
if more.mt == nil {
return
}
sl := more.mt.sl
itrs = append(itrs, sl.NewUniIterator(false))
mts = append(mts, more.mt)
cbs = append(cbs, more.cb)

sz += sl.MemSize()
if sz > db.opt.MemTableSize {
return
}
default:
return
}
}
}

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
continue
}
sz = ft.mt.sl.MemSize()
// Reset of itrs, mts etc. is being done below.
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
mts = append(mts, ft.mt)
cbs = append(cbs, ft.cb)

// Pick more memtables, so we can really fill up the L0 table.
slurp()

// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
ft.mt = nil
ft.itr = table.NewMergeIterator(itrs, false)
ft.cb = nil

for {
err := db.handleFlushTask(ft)
if err == nil {
Expand All @@ -1173,26 +1092,17 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
for _, mt := range mts {
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
}
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
db.lock.Unlock()

for _, cb := range cbs {
if cb != nil {
cb()
}
}
break
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
// Reset everything.
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10<<20, "test")
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
// DiscardEarlierVersions returns whether the item was created with the
// option to discard earlier versions of a key when multiple are available.
func (item *Item) DiscardEarlierVersions() bool {
return item.meta&BitDiscardEarlierVersions > 0
return item.meta&bitDiscardEarlierVersions > 0
}

func (item *Item) yieldItemValue() ([]byte, func(), error) {
Expand Down
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
}
lastKey = y.SafeCopy(lastKey, it.Key())
numVersions = 0
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0

if len(tableKr.left) == 0 {
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
Expand Down Expand Up @@ -754,7 +754,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// - The `discardEarlierVersions` bit is set OR
// - We've already processed `NumVersionsToKeep` number of versions
// (including the current item being processed)
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
numVersions == s.kv.opt.NumVersionsToKeep

if isExpired || lastValidVersion {
Expand Down
34 changes: 17 additions & 17 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,11 +724,11 @@ func TestDiscardFirstVersion(t *testing.T) {

runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}

// Level 0 has all the tables.
createAndOpen(db, l0, 0)
Expand Down Expand Up @@ -759,11 +759,11 @@ func TestDiscardFirstVersion(t *testing.T) {
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
// marker so IT WILL BE REMOVED.
ExpectedKeys := []keyValVersion{
{"foo", "bar", 10, BitDiscardEarlierVersions},
{"foo", "bar", 10, bitDiscardEarlierVersions},
{"foo", "bar", 9, 0},
{"foo", "bar", 4, 0},
{"foo", "bar", 3, 0},
{"foo", "bar", 2, BitDiscardEarlierVersions}}
{"foo", "bar", 2, bitDiscardEarlierVersions}}

getAllAndCheck(t, db, ExpectedKeys)
})
Expand Down Expand Up @@ -1077,15 +1077,15 @@ func TestSameLevel(t *testing.T) {
opt.LmaxCompaction = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l6 := []keyValVersion{
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
}
l61 := []keyValVersion{
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
}
l62 := []keyValVersion{
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
}
createAndOpen(db, l6, 6)
Expand All @@ -1094,11 +1094,11 @@ func TestSameLevel(t *testing.T) {
require.NoError(t, db.lc.validate())

getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1114,11 +1114,11 @@ func TestSameLevel(t *testing.T) {
db.SetDiscardTs(3)
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1135,9 +1135,9 @@ func TestSameLevel(t *testing.T) {
cdef.t.baseLevel = 1
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
require.NoError(t, db.lc.validate())
})
}
Expand Down Expand Up @@ -1203,7 +1203,7 @@ func TestStaleDataCleanup(t *testing.T) {
for i := count; i > 0; i-- {
var meta byte
if i == 0 {
meta = BitDiscardEarlierVersions
meta = bitDiscardEarlierVersions
}
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
}
Expand Down
48 changes: 0 additions & 48 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,54 +771,6 @@ func TestWriteBatchDuplicate(t *testing.T) {
})
}

func TestWriteViaSkip(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
s := db.NewSkiplist()
for i := 0; i < 100; i++ {
s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)})
}
{
// Update key timestamps by directly changing them in the skiplist.
itr := s.NewUniIterator(false)
defer itr.Close()
itr.Rewind()
for itr.Valid() {
y.SetKeyTs(itr.Key(), 101)
itr.Next()
}
}

// Hand over skiplist to Badger.
require.NoError(t, db.HandoverSkiplist(s, nil))

// Read the data back.
txn := db.NewTransactionAt(101, false)
defer txn.Discard()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := 0
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(101))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, 100, i)
})
}

func TestZeroDiscardStats(t *testing.T) {
N := uint64(10000)
populate := func(t *testing.T, db *DB) {
Expand Down
Loading

0 comments on commit ff5c908

Please sign in to comment.