Skip to content

Commit

Permalink
feat(Skiplist): Introduce a way to hand over skiplists to Badger (dgr…
Browse files Browse the repository at this point in the history
…aph-io#1696)

In Dgraph, we already use Raft write-ahead log. Also, when we commit transactions, we update tens of thousands of keys in one go. To optimize this write path, this PR introduces a way to directly hand over Skiplist to Badger, short circuiting Badger's Value Log and WAL.

This feature allows Dgraph to generate Skiplists while processing mutations and just hand them over to Badger during commits. It also accepts a callback which can be run when Skiplist is written to disk. This is useful for determining when to create a snapshot in Dgraph.
  • Loading branch information
manishrjain authored and fredcarle committed Aug 1, 2023
1 parent 33ca02f commit bad5542
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 28 deletions.
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
if err := txn.SetEntry(entry); err != nil {
return err
}
updates[i] = bitDiscardEarlierVersions
updates[i] = BitDiscardEarlierVersions
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (item *Item) IsDeletedOrExpired() bool {
// DiscardEarlierVersions returns whether the item was created with the
// option to discard earlier versions of a key when multiple are available.
func (item *Item) DiscardEarlierVersions() bool {
return item.meta&bitDiscardEarlierVersions > 0
return item.meta&BitDiscardEarlierVersions > 0
}

func (item *Item) yieldItemValue() ([]byte, func(), error) {
Expand Down
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
}
lastKey = y.SafeCopy(lastKey, it.Key())
numVersions = 0
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0

if len(tableKr.left) == 0 {
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
Expand Down Expand Up @@ -753,7 +753,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// - The `discardEarlierVersions` bit is set OR
// - We've already processed `NumVersionsToKeep` number of versions
// (including the current item being processed)
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
numVersions == s.kv.opt.NumVersionsToKeep

if isExpired || lastValidVersion {
Expand Down
34 changes: 17 additions & 17 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,11 @@ func TestDiscardFirstVersion(t *testing.T) {

runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}

// Level 0 has all the tables.
createAndOpen(db, l0, 0)
Expand Down Expand Up @@ -742,11 +742,11 @@ func TestDiscardFirstVersion(t *testing.T) {
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
// marker so IT WILL BE REMOVED.
ExpectedKeys := []keyValVersion{
{"foo", "bar", 10, bitDiscardEarlierVersions},
{"foo", "bar", 10, BitDiscardEarlierVersions},
{"foo", "bar", 9, 0},
{"foo", "bar", 4, 0},
{"foo", "bar", 3, 0},
{"foo", "bar", 2, bitDiscardEarlierVersions}}
{"foo", "bar", 2, BitDiscardEarlierVersions}}

getAllAndCheck(t, db, ExpectedKeys)
})
Expand Down Expand Up @@ -1060,15 +1060,15 @@ func TestSameLevel(t *testing.T) {
opt.LmaxCompaction = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l6 := []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
}
l61 := []keyValVersion{
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
}
l62 := []keyValVersion{
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
}
createAndOpen(db, l6, 6)
Expand All @@ -1077,11 +1077,11 @@ func TestSameLevel(t *testing.T) {
require.NoError(t, db.lc.validate())

getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1097,11 +1097,11 @@ func TestSameLevel(t *testing.T) {
db.SetDiscardTs(3)
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1118,9 +1118,9 @@ func TestSameLevel(t *testing.T) {
cdef.t.baseLevel = 1
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
require.NoError(t, db.lc.validate())
})
}
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestStaleDataCleanup(t *testing.T) {
for i := count; i > 0; i-- {
var meta byte
if i == 0 {
meta = bitDiscardEarlierVersions
meta = BitDiscardEarlierVersions
}
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
}
Expand Down
48 changes: 48 additions & 0 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,54 @@ func TestWriteBatchDuplicate(t *testing.T) {
})
}

func TestWriteViaSkip(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
s := db.NewSkiplist()
for i := 0; i < 100; i++ {
s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)})
}
{
// Update key timestamps by directly changing them in the skiplist.
itr := s.NewUniIterator(false)
defer itr.Close()
itr.Rewind()
for itr.Valid() {
y.SetKeyTs(itr.Key(), 101)
itr.Next()
}
}

// Hand over skiplist to Badger.
require.NoError(t, db.HandoverSkiplist(s, nil))

// Read the data back.
txn := db.NewTransactionAt(101, false)
defer txn.Discard()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := 0
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(101))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, 100, i)
})
}

func TestZeroDiscardStats(t *testing.T) {
N := uint64(10000)
populate := func(t *testing.T, db *DB) {
Expand Down
2 changes: 1 addition & 1 deletion merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (op *MergeOperator) compact() error {
{
Key: y.KeyWithTs(op.key, version),
Value: val,
meta: bitDiscardEarlierVersions,
meta: BitDiscardEarlierVersions,
},
}
// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func DefaultOptions(path string) Options {
NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 15,
NumMemtables: 5,
NumMemtables: 15,
BloomFalsePositive: 0.01,
BlockSize: 4 * 1024,
SyncWrites: false,
Expand Down
4 changes: 4 additions & 0 deletions skl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,7 @@ func (b *Builder) Add(k []byte, v y.ValueStruct) {
b.prev[i] = x
}
}

func (b *Builder) Skiplist() *Skiplist {
return b.s
}
2 changes: 1 addition & 1 deletion structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (e *Entry) WithMeta(meta byte) *Entry {
// have a higher setting for NumVersionsToKeep (in Dgraph, we set it to infinity), you can use this
// method to indicate that all the older versions can be discarded and removed during compactions.
func (e *Entry) WithDiscard() *Entry {
e.meta = bitDiscardEarlierVersions
e.meta = BitDiscardEarlierVersions
return e
}

Expand Down
26 changes: 23 additions & 3 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,30 @@ func exceedsSize(prefix string, max int64, key []byte) error {
prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
}

func (txn *Txn) modify(e *Entry) error {
const maxKeySize = 65000
const maxKeySize = 65000
const maxValSize = 1 << 20

func ValidEntry(db *DB, key, val []byte) error {
switch {
case len(key) == 0:
return ErrEmptyKey
case bytes.HasPrefix(key, badgerPrefix):
return ErrInvalidKey
case len(key) > maxKeySize:
// Key length can't be more than uint16, as determined by table::header. To
// keep things safe and allow badger move prefix and a timestamp suffix, let's
// cut it down to 65000, instead of using 65536.
return exceedsSize("Key", maxKeySize, key)
case int64(len(val)) > maxValSize:
return exceedsSize("Value", maxValSize, val)
}
if err := db.isBanned(key); err != nil {
return err
}
return nil
}

func (txn *Txn) modify(e *Entry) error {
switch {
case !txn.update:
return ErrReadOnlyTxn
Expand All @@ -385,7 +406,6 @@ func (txn *Txn) modify(e *Entry) error {
if err := txn.db.isBanned(e.Key); err != nil {
return err
}

if err := txn.checkSize(e); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var maxVlogFileSize uint32 = math.MaxUint32
const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
BitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
// Set if item shouldn't be discarded via compactions (used by merge operator)
bitMergeEntry byte = 1 << 3
// The MSB 2 bits are for transactions.
Expand Down
5 changes: 5 additions & 0 deletions y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func Copy(a []byte) []byte {
return b
}

func SetKeyTs(key []byte, ts uint64) {
start := len(key) - 8
binary.BigEndian.PutUint64(key[start:], math.MaxUint64-ts)
}

// KeyWithTs generates a new key by appending ts to key.
func KeyWithTs(key []byte, ts uint64) []byte {
out := make([]byte, len(key)+8)
Expand Down

0 comments on commit bad5542

Please sign in to comment.