Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Dgraph): check for deleteBelowTs in pIterator.valid #7288

Merged
merged 6 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type pIterator struct {
deleteBelowTs uint64
}

func (it *pIterator) init(l *List, afterUid, deleteBelowTs uint64) error {
func (it *pIterator) seek(l *List, afterUid, deleteBelowTs uint64) error {
if deleteBelowTs > 0 && deleteBelowTs <= l.minTs {
return errors.Errorf("deleteBelowTs (%d) must be greater than the minTs in the list (%d)",
deleteBelowTs, l.minTs)
Expand Down Expand Up @@ -209,24 +209,19 @@ func (it *pIterator) moveToNextValidPart() error {
return nil
}

// If there are no more UIDs to iterate over, move to the next part of the
// list that contains valid data.
if len(it.uids) == 0 {
for it.splitIdx <= len(it.l.plist.Splits)-2 {
// moveToNextPart will increment it.splitIdx. Therefore, the for loop must only
// continue until len(splits) - 2.
if err := it.moveToNextPart(); err != nil {
return err
}

if len(it.uids) > 0 {
return nil
}
// Iterate while there are no UIDs, and while we have more splits to iterate over.
for len(it.uids) == 0 && it.splitIdx < len(it.l.plist.Splits)-1 {
// moveToNextPart will increment it.splitIdx. Therefore, the for loop must only
// continue until len(splits)-1.
if err := it.moveToNextPart(); err != nil {
return err
}
}

return nil
}

// next advances pIterator to the next valid part.
func (it *pIterator) next() error {
if it.deleteBelowTs > 0 {
it.uids = nil
Expand All @@ -244,7 +239,14 @@ func (it *pIterator) next() error {
hex.EncodeToString(it.l.key))
}

// valid asserts that pIterator has valid uids, or advances it to the next valid part.
// It returns false if there are no more valid parts.
func (it *pIterator) valid() (bool, error) {
if it.deleteBelowTs > 0 {
it.uids = nil
return false, nil
}

if len(it.uids) > 0 {
return true, nil
}
Expand Down Expand Up @@ -566,7 +568,8 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.Unlock()
}

// Iterate will allow you to iterate over this posting List, while having acquired a read lock.
// Iterate will allow you to iterate over the mutable and immutable layers of
// this posting List, while having acquired a read lock.
// So, please keep this iteration cheap, otherwise mutations would get stuck.
// The iteration will start after the provided UID. The results would not include this uid.
// The function will loop until either the posting List is fully iterated, or you return a false
Expand Down Expand Up @@ -649,6 +652,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) {
func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error {
l.AssertRLock()

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
Expand All @@ -668,7 +672,9 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
prevUid uint64
err error
)
err = pitr.init(l, afterUid, deleteBelowTs)

// pitr iterates through immutable postings
err = pitr.seek(l, afterUid, deleteBelowTs)
if err != nil {
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate")
}
Expand Down Expand Up @@ -1433,6 +1439,7 @@ func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string,
return fcs, nil
}

// readListPart reads one split of a posting list from Badger.
func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) {
key, err := x.SplitKey(l.key, startUid)
if err != nil {
Expand Down
58 changes: 45 additions & 13 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
"github.com/dgraph-io/dgraph/x"
)

func setMaxListSize(newMaxListSize int) {
maxListSize = newMaxListSize
}

func (l *List) PostingList() *pb.PostingList {
l.RLock()
defer l.RUnlock()
Expand Down Expand Up @@ -451,6 +455,7 @@ func TestAddMutation_mrjn1(t *testing.T) {

func TestMillion(t *testing.T) {
// Ensure list is stored in a single part.
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

key := x.DataKey("bal", 1331)
Expand Down Expand Up @@ -907,10 +912,8 @@ func verifySplits(t *testing.T, splits []uint64) {

func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -955,10 +958,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {

func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
// For testing, set the max list size to a lower threshold.
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1007,6 +1008,41 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
return ol, commits
}

func TestDeleteStarMultiPartList(t *testing.T) {
numEdges := 10000

list, _ := createMultiPartList(t, numEdges, false)
parsedKey, err := x.Parse(list.key)
require.NoError(t, err)

validateCount := func(expected int) {
count := 0
list.Iterate(math.MaxUint64, 0, func(posting *pb.Posting) error {
count++
return nil
})
require.Equal(t, expected, count)
}
validateCount(numEdges)

readTs := list.maxTs + 1
commitTs := readTs + 1

txn := NewTxn(readTs)
edge := &pb.DirectedEdge{
ValueId: parsedKey.Uid,
Attr: parsedKey.Attr,
Value: []byte(x.Star),
Op: pb.DirectedEdge_DEL,
}
err = list.addMutation(context.Background(), txn, edge)
require.NoError(t, err)

err = list.commitMutation(readTs, commitTs)
require.NoError(t, err)
validateCount(0)
}

func writePostingListToDisk(kvs []*bpb.KV) error {
writer := NewTxnWriter(pstore)
for _, kv := range kvs {
Expand Down Expand Up @@ -1147,10 +1183,8 @@ func TestMultiPartListDelete(t *testing.T) {
func TestMultiPartListDeleteAndAdd(t *testing.T) {
size := int(1e5)
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
Expand Down Expand Up @@ -1285,10 +1319,8 @@ func TestSingleListRollup(t *testing.T) {

func TestRecursiveSplits(t *testing.T) {
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = mb / 2
defer func() {
maxListSize = math.MaxInt32
}()

// Create a list that should be split recursively.
size := int(1e5)
Expand Down