Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Skiplist): Introduce a way to hand over skiplists to Badger #1696

Merged
merged 12 commits into from
Apr 27, 2021
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
if err := txn.SetEntry(entry); err != nil {
return err
}
updates[i] = bitDiscardEarlierVersions
updates[i] = BitDiscardEarlierVersions
}
return nil
})
Expand Down
134 changes: 112 additions & 22 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,16 +763,9 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
// value log and that's why the length of b.Ptrs will always be zero.
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}

for i, entry := range b.Entries {
var err error
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -818,10 +811,13 @@ func (db *DB) writeRequests(reqs []*request) error {
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
if !db.opt.managedTxns {
// Don't do value log writes in managed mode.
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
}

db.opt.Debugf("Sending updates to subscribers")
Expand All @@ -834,6 +830,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -1010,16 +1007,61 @@ func (db *DB) ensureRoomForWrite() error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}
db.lock.Lock()
defer db.lock.Unlock()

// If we have some data in db.mt, we should push that first, so the ordering of writes is
// maintained.
if !db.mt.sl.Empty() {
sz := db.mt.sl.MemSize()
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
var err error
select {
case db.flushChan <- flushTask{mt: db.mt}:
db.imm = append(db.imm, db.mt)
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot push current memtable")
}
default:
return errNoRoom
}
}

mt := &memTable{sl: skl}
select {
case db.flushChan <- flushTask{mt: mt, cb: callback}:
db.imm = append(db.imm, mt)
return nil
default:
return errNoRoom
}
}

func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}

func (db *DB) NewSkiplist() *skl.Skiplist {
return skl.NewSkiplist(arenaSize(db.opt))
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
var iter y.Iterator
if ft.itr != nil {
iter = ft.itr
} else {
iter = ft.mt.sl.NewUniIterator(false)
}
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
Expand All @@ -1035,16 +1077,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {

type flushTask struct {
mt *memTable
cb func()
itr y.Iterator
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

// ft.mt could be nil with ft.itr being the valid field.
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()
Expand Down Expand Up @@ -1080,11 +1120,52 @@ func (db *DB) handleFlushTask(ft flushTask) error {
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

var sz int64
var itrs []y.Iterator
var mts []*memTable
var cbs []func()
slurp := func() {
for {
select {
case more := <-db.flushChan:
if more.mt == nil {
return
}
sl := more.mt.sl
itrs = append(itrs, sl.NewUniIterator(false))
mts = append(mts, more.mt)
cbs = append(cbs, more.cb)

sz += sl.MemSize()
if sz > db.opt.MemTableSize {
return
}
default:
return
}
}
}

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
continue
}
sz = ft.mt.sl.MemSize()
// Reset of itrs, mts etc. is being done below.
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
mts = append(mts, ft.mt)
cbs = append(cbs, ft.cb)

// Pick more memtables, so we can really fill up the L0 table.
slurp()

// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
ft.mt = nil
ft.itr = table.NewMergeIterator(itrs, false)
ft.cb = nil

for {
err := db.handleFlushTask(ft)
if err == nil {
Expand All @@ -1095,17 +1176,26 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
for _, mt := range mts {
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
}
db.lock.Unlock()

for _, cb := range cbs {
if cb != nil {
cb()
}
}
break
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
// Reset everything.
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
// DiscardEarlierVersions returns whether the item was created with the
// option to discard earlier versions of a key when multiple are available.
func (item *Item) DiscardEarlierVersions() bool {
return item.meta&bitDiscardEarlierVersions > 0
return item.meta&BitDiscardEarlierVersions > 0
}

func (item *Item) yieldItemValue() ([]byte, func(), error) {
Expand Down
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
}
lastKey = y.SafeCopy(lastKey, it.Key())
numVersions = 0
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0

if len(tableKr.left) == 0 {
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
Expand Down Expand Up @@ -753,7 +753,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// - The `discardEarlierVersions` bit is set OR
// - We've already processed `NumVersionsToKeep` number of versions
// (including the current item being processed)
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
numVersions == s.kv.opt.NumVersionsToKeep

if isExpired || lastValidVersion {
Expand Down
34 changes: 17 additions & 17 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,11 @@ func TestDiscardFirstVersion(t *testing.T) {

runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}

// Level 0 has all the tables.
createAndOpen(db, l0, 0)
Expand Down Expand Up @@ -742,11 +742,11 @@ func TestDiscardFirstVersion(t *testing.T) {
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
// marker so IT WILL BE REMOVED.
ExpectedKeys := []keyValVersion{
{"foo", "bar", 10, bitDiscardEarlierVersions},
{"foo", "bar", 10, BitDiscardEarlierVersions},
{"foo", "bar", 9, 0},
{"foo", "bar", 4, 0},
{"foo", "bar", 3, 0},
{"foo", "bar", 2, bitDiscardEarlierVersions}}
{"foo", "bar", 2, BitDiscardEarlierVersions}}

getAllAndCheck(t, db, ExpectedKeys)
})
Expand Down Expand Up @@ -1060,15 +1060,15 @@ func TestSameLevel(t *testing.T) {
opt.LmaxCompaction = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l6 := []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
}
l61 := []keyValVersion{
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
}
l62 := []keyValVersion{
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
}
createAndOpen(db, l6, 6)
Expand All @@ -1077,11 +1077,11 @@ func TestSameLevel(t *testing.T) {
require.NoError(t, db.lc.validate())

getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1097,11 +1097,11 @@ func TestSameLevel(t *testing.T) {
db.SetDiscardTs(3)
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1118,9 +1118,9 @@ func TestSameLevel(t *testing.T) {
cdef.t.baseLevel = 1
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
require.NoError(t, db.lc.validate())
})
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ func TestStaleDataCleanup(t *testing.T) {
for i := count; i > 0; i-- {
var meta byte
if i == 0 {
meta = bitDiscardEarlierVersions
meta = BitDiscardEarlierVersions
}
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
}
Expand Down
Loading