Skip to content

Commit

Permalink
Split posting lists recursively. (#4867)
Browse files Browse the repository at this point in the history
Currently, split posting lists are split in one go. This means that the
resulting splits will be added to the list even if they are still too
big. This change recursively splits the parts of the lists until all of
them are smaller than the threshold.
  • Loading branch information
martinmr authored Mar 11, 2020
1 parent 4213734 commit 5fcb59b
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
22 changes: 21 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
// Check if the list (or any of it's parts if it's been previously split) have
// become too big. Split the list if that is the case.
out.newMinTs = maxCommitTs
out.splitUpList()
out.recursiveSplit()
out.removeEmptySplits()
} else {
out.plist.Splits = nil
Expand Down Expand Up @@ -1242,6 +1242,26 @@ func shouldSplit(plist *pb.PostingList) bool {
return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
}

func (out *rollupOutput) recursiveSplit() {
// Call splitUpList. Otherwise the map of startUids to parts won't be initialized.
out.splitUpList()

// Keep calling splitUpList until all the parts cannot be further split.
for {
needsSplit := false
for _, part := range out.parts {
if shouldSplit(part) {
needsSplit = true
}
}

if !needsSplit {
return
}
out.splitUpList()
}
}

// splitUpList checks the list and splits it in smaller parts if needed.
func (out *rollupOutput) splitUpList() {
// Contains the posting lists that should be split.
Expand Down
51 changes: 51 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,57 @@ func TestSingleListRollup(t *testing.T) {
}
}

func TestRecursiveSplits(t *testing.T) {
// For testing, set the max list size to a lower threshold.
maxListSize = mb / 2
defer func() {
maxListSize = math.MaxInt32
}()

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey("recursive", 1331)
ol, err := getNew(key, ps)
require.NoError(t, err)
commits := 0
for i := 1; i <= size; i++ {
commits++
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
edge.Label = strconv.Itoa(i)

txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))

// Do not roll-up the list here to ensure the final list should
// be split more than once.
}

// Rollup the list. The final output should have more than two parts.
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 2)

// Read back the list and verify the data is correct.
var labels []string
err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
if len(p.Label) > 0 {
labels = append(labels, p.Label)
}
return nil
})
require.NoError(t, err)
require.Equal(t, commits, len(labels))
for i, label := range labels {
require.Equal(t, label, strconv.Itoa(int(i+1)))
}
}

var ps *badger.DB

func TestMain(m *testing.M) {
Expand Down

0 comments on commit 5fcb59b

Please sign in to comment.