Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

During backup, collapse split posting lists into a single list. #4682

Merged
merged 10 commits into from
Feb 10, 2020
25 changes: 12 additions & 13 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,24 @@ func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
kvs, err := l.Rollup()

kv, err := l.SingleListRollup()
if err != nil {
return nil, errors.Wrapf(err, "while rolling up list")
}

for _, kv := range kvs {
backupKey, err := toBackupKey(kv.Key)
if err != nil {
return nil, err
}
kv.Key = backupKey
backupKey, err := toBackupKey(kv.Key)
if err != nil {
return nil, err
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
if err != nil {
return nil, err
}
kv.Value = backupPl
backupPl, err := toBackupPostingList(kv.Value)
if err != nil {
return nil, err
}
list.Kv = append(list.Kv, kvs...)
kv.Value = backupPl
list.Kv = append(list.Kv, kv)
case posting.BitSchemaPosting:
valCopy, err := item.ValueCopy(nil)
if err != nil {
Expand Down
53 changes: 40 additions & 13 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ func (l *List) Length(readTs, afterUid uint64) int {
func (l *List) Rollup() ([]*bpb.KV, error) {
l.RLock()
defer l.RUnlock()
out, err := l.rollup(math.MaxUint64)
out, err := l.rollup(math.MaxUint64, true)
if err != nil {
return nil, err
}
Expand All @@ -734,6 +734,30 @@ func (l *List) Rollup() ([]*bpb.KV, error) {
return kvs, nil
}

// SingleListRollup works like rollup but generates a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) SingleListRollup() (*bpb.KV, error) {
l.RLock()
defer l.RUnlock()

out, err := l.rollup(math.MaxUint64, false)
if err != nil {
return nil, err
}
// out is only nil when the list's minTs is greater than readTs but readTs
// is math.MaxUint64 so that's not possible. Assert that's true.
x.AssertTrue(out != nil)

kv := &bpb.KV{}
kv.Version = out.newMinTs
kv.Key = l.key
val, meta := marshalPostingList(out.plist)
kv.UserMeta = []byte{meta}
kv.Value = val

return kv, nil
}

func (out *rollupOutput) marshalPostingListPart(
baseKey []byte, startUid uint64, plist *pb.PostingList) *bpb.KV {
kv := &bpb.KV{}
Expand Down Expand Up @@ -769,7 +793,7 @@ type rollupOutput struct {
// Merge all entries in mutation layer with commitTs <= l.commitTs into
// immutable layer. Note that readTs can be math.MaxUint64, so do NOT use it
// directly. It should only serve as the read timestamp for iteration.
func (l *List) rollup(readTs uint64) (*rollupOutput, error) {
func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
l.AssertRLock()

// Pick all committed entries
Expand All @@ -795,8 +819,7 @@ func (l *List) rollup(readTs uint64) (*rollupOutput, error) {
initializeSplit := func() {
enc = codec.Encoder{BlockSize: blockSize}

// Otherwise, load the corresponding part and set endUid to correctly
// detect the end of the list.
// Load the corresponding part and set endUid to correctly detect the end of the list.
startUid = l.plist.Splits[splitIdx]
if splitIdx+1 == len(l.plist.Splits) {
endUid = math.MaxUint64
Expand All @@ -808,15 +831,15 @@ func (l *List) rollup(readTs uint64) (*rollupOutput, error) {
}

// If not a multi-part list, all UIDs go to the same encoder.
if len(l.plist.Splits) == 0 {
if len(l.plist.Splits) == 0 || !split {
plist = out.plist
endUid = math.MaxUint64
} else {
initializeSplit()
}

err := l.iterate(readTs, 0, func(p *pb.Posting) error {
if p.Uid > endUid {
if p.Uid > endUid && split {
plist.Pack = enc.Done()
out.parts[startUid] = plist

Expand Down Expand Up @@ -852,11 +875,16 @@ func (l *List) rollup(readTs uint64) (*rollupOutput, error) {
}
}

// Check if the list (or any of it's parts if it's been previously split) have
// become too big. Split the list if that is the case.
out.newMinTs = maxCommitTs
out.splitUpList()
out.removeEmptySplits()
if split {
// Check if the list (or any of it's parts if it's been previously split) have
// become too big. Split the list if that is the case.
out.newMinTs = maxCommitTs
out.splitUpList()
out.removeEmptySplits()
} else {
out.plist.Splits = nil
}

return out, nil
}

Expand Down Expand Up @@ -1176,8 +1204,7 @@ func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) {

// shouldSplit returns true if the given plist should be split in two.
func shouldSplit(plist *pb.PostingList) bool {
return false
// return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
}

// splitUpList checks the list and splits it in smaller parts if needed.
Expand Down
Loading