From ecc1ac585f13004dcff894cecfb36be77f9cb7c2 Mon Sep 17 00:00:00 2001
From: Martin Martinez Rivera <martinmr@dgraph.io>
Date: Fri, 24 Jul 2020 13:19:52 -0700
Subject: [PATCH] refactor: Simplify how list splits are tracked. (#5920)

Currently the code keeps tracks of the split startUids and updates them
as the posting lists are split or empty parts are removed. Instead,
update the list of startUids at the end of each step based on the
contents of the map of startUids to posting list. This removes the need
of keeping track of the same piece of information in two different ways.

Also, fix a bug where the empty parts were removed from the list of
splits but not from the map. This caused empty posting lists to be
written to disk although it didn't affect normal execution because those
parts were not accessible from the main posting list.

(cherry picked from commit c51d007e7d53a0d313bab3e73e8536ff1fc455c0)
---
 posting/list.go      | 40 +++++++++++++++-------------------------
 posting/list_test.go | 12 ++++++++++++
 2 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/posting/list.go b/posting/list.go
index 45bfef9bece..22599176133 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -1341,6 +1341,13 @@ func shouldSplit(plist *pb.PostingList) bool {
 	return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
 }
 
+func (out *rollupOutput) updateSplits() {
+	if out.plist == nil {
+		out.plist = &pb.PostingList{}
+	}
+	out.plist.Splits = out.splits()
+}
+
 func (out *rollupOutput) recursiveSplit() {
 	// Call splitUpList. Otherwise the map of startUids to parts won't be initialized.
 	out.splitUpList()
@@ -1367,7 +1374,7 @@ func (out *rollupOutput) splitUpList() {
 	var lists []*pb.PostingList
 
 	// If list is not split yet, insert the main list.
-	if len(out.plist.Splits) == 0 {
+	if len(out.parts) == 0 {
 		lists = append(lists, out.plist)
 	}
 
@@ -1377,13 +1384,10 @@ func (out *rollupOutput) splitUpList() {
 		lists = append(lists, part)
 	}
 
-	// List of startUids for each list part after the splitting process is complete.
-	var newSplits []uint64
-
 	for i, list := range lists {
 		startUid := uint64(1)
 		// If the list is split, select the right startUid for this list.
-		if len(out.plist.Splits) > 0 {
+		if len(out.parts) > 0 {
 			startUid = out.plist.Splits[i]
 		}
 
@@ -1393,23 +1397,11 @@ func (out *rollupOutput) splitUpList() {
 			startUids, pls := binSplit(startUid, list)
 			for i, startUid := range startUids {
 				out.parts[startUid] = pls[i]
-				newSplits = append(newSplits, startUid)
 			}
-		} else {
-			// No need to split the list. Add the startUid to the array of new splits.
-			newSplits = append(newSplits, startUid)
 		}
 	}
 
-	// No new lists were created so there's no need to update the list of splits.
-	if len(newSplits) == len(lists) {
-		return
-	}
-
-	// The splits changed so update them.
-	out.plist = &pb.PostingList{
-		Splits: newSplits,
-	}
+	out.updateSplits()
 }
 
 // binSplit takes the given plist and returns two new plists, each with
@@ -1447,28 +1439,26 @@ func binSplit(lowUid uint64, plist *pb.PostingList) ([]uint64, []*pb.PostingList
 
 // removeEmptySplits updates the split list by removing empty posting lists' startUids.
 func (out *rollupOutput) removeEmptySplits() {
-	var splits []uint64
 	for startUid, plist := range out.parts {
 		// Do not remove the first split for now, as every multi-part list should always
 		// have a split starting with UID 1.
 		if startUid == 1 {
-			splits = append(splits, startUid)
 			continue
 		}
 
-		if !isPlistEmpty(plist) {
-			splits = append(splits, startUid)
+		if isPlistEmpty(plist) {
+			delete(out.parts, startUid)
 		}
 	}
-	out.plist.Splits = splits
-	sortSplits(splits)
+	out.updateSplits()
 
-	if len(out.plist.Splits) == 1 {
+	if len(out.parts) == 1 && isPlistEmpty(out.parts[1]) {
 		// Only the first split remains. If it's also empty, remove it as well.
 		// This should mark the entire list for deletion. Please note that the
 		// startUid of the first part is always one because a node can never have
 		// its uid set to zero.
 		if isPlistEmpty(out.parts[1]) {
+			delete(out.parts, 1)
 			out.plist.Splits = []uint64{}
 		}
 	}
diff --git a/posting/list_test.go b/posting/list_test.go
index ac543c4b3d5..9f1ca3c46e8 100644
--- a/posting/list_test.go
+++ b/posting/list_test.go
@@ -895,6 +895,16 @@ func TestAfterUIDCountWithCommit(t *testing.T) {
 	require.EqualValues(t, 0, ol.Length(txn.StartTs, 300))
 }
 
+func verifySplits(t *testing.T, splits []uint64) {
+	require.Equal(t, uint64(1), splits[0])
+	for i, uid := range splits {
+		if i == 0 {
+			continue
+		}
+		require.Greater(t, uid, splits[i-1])
+	}
+}
+
 func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
 	// For testing, set the max list size to a lower threshold.
 	maxListSize = 5000
@@ -936,6 +946,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
 	ol, err = getNew(key, ps, math.MaxUint64)
 	require.NoError(t, err)
 	require.True(t, len(ol.plist.Splits) > 0)
+	verifySplits(t, ol.plist.Splits)
 
 	return ol, commits
 }
@@ -969,6 +980,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
 		commits++
 	}
 	require.True(t, len(ol.plist.Splits) > 0)
+	verifySplits(t, ol.plist.Splits)
 
 	// Delete all the previously inserted entries from the list.
 	baseStartTs := uint64(size) + 1