Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split posting lists recursively. #4867

Merged
merged 5 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
// Check if the list (or any of it's parts if it's been previously split) have
// become too big. Split the list if that is the case.
out.newMinTs = maxCommitTs
out.splitUpList()
out.recursiveSplit()
out.removeEmptySplits()
} else {
out.plist.Splits = nil
Expand Down Expand Up @@ -1242,6 +1242,26 @@ func shouldSplit(plist *pb.PostingList) bool {
return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
}

func (out *rollupOutput) recursiveSplit() {
// Call splitUpList. Otherwise the map of startUids to parts won't be initialized.
out.splitUpList()

// Keep calling splitUpList until all the parts cannot be further split.
for {
needsSplit := false
for _, part := range out.parts {
if shouldSplit(part) {
needsSplit = true
}
}

if !needsSplit {
return
}
out.splitUpList()
}
}

// splitUpList checks the list and splits it in smaller parts if needed.
func (out *rollupOutput) splitUpList() {
// Contains the posting lists that should be split.
Expand Down
51 changes: 51 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,57 @@ func TestSingleListRollup(t *testing.T) {
}
}

func TestRecursiveSplits(t *testing.T) {
// For testing, set the max list size to a lower threshold.
maxListSize = mb / 2
defer func() {
maxListSize = math.MaxInt32
}()

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey("recursive", 1331)
ol, err := getNew(key, ps)
require.NoError(t, err)
commits := 0
for i := 1; i <= size; i++ {
commits++
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
edge.Label = strconv.Itoa(i)

txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))

// Do not roll-up the list here to ensure the final list should
// be split more than once.
}

// Rollup the list. The final output should have more than two parts.
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 2)

// Read back the list and verify the data is correct.
var labels []string
err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
if len(p.Label) > 0 {
labels = append(labels, p.Label)
}
return nil
})
require.NoError(t, err)
require.Equal(t, commits, len(labels))
for i, label := range labels {
require.Equal(t, label, strconv.Itoa(int(i+1)))
}
}

var ps *badger.DB

func TestMain(m *testing.M) {
Expand Down