Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release/v1.2: Cherry-pick commits to enable and fix posting-list splits. #4742

Merged
merged 3 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,7 @@ func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) {

// shouldSplit returns true if the given plist should be split in two.
func shouldSplit(plist *pb.PostingList) bool {
return false
// return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
}

// splitUpList checks the list and splits it in smaller parts if needed.
Expand Down
8 changes: 3 additions & 5 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 0)

return ol, commits
}
Expand Down Expand Up @@ -963,6 +964,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
}
commits++
}
require.True(t, len(ol.plist.Splits) > 0)

// Delete all the previously inserted entries from the list.
baseStartTs := uint64(size) + 1
Expand All @@ -982,6 +984,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
}
commits++
}
require.Equal(t, 0, len(ol.plist.Splits))

return ol, commits
}
Expand All @@ -1000,7 +1003,6 @@ func writePostingListToDisk(kvs []*bpb.KV) error {
func TestMultiPartListBasic(t *testing.T) {
size := int(1e5)
ol, commits := createMultiPartList(t, size, false)
t.Logf("List parts %v", len(ol.plist.Splits))
opt := ListOptions{ReadTs: uint64(size) + 1}
l, err := ol.Uids(opt)
require.NoError(t, err)
Expand All @@ -1014,7 +1016,6 @@ func TestMultiPartListBasic(t *testing.T) {
func TestMultiPartListIterAfterUid(t *testing.T) {
size := int(1e5)
ol, _ := createMultiPartList(t, size, false)
t.Logf("List parts %v", len(ol.plist.Splits))

var visitedUids []uint64
ol.Iterate(uint64(size+1), 50000, func(p *pb.Posting) error {
Expand All @@ -1031,7 +1032,6 @@ func TestMultiPartListIterAfterUid(t *testing.T) {
func TestMultiPartListWithPostings(t *testing.T) {
size := int(1e5)
ol, commits := createMultiPartList(t, size, true)
t.Logf("List parts %v", len(ol.plist.Splits))

var labels []string
err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
Expand All @@ -1051,7 +1051,6 @@ func TestMultiPartListWithPostings(t *testing.T) {
func TestMultiPartListMarshal(t *testing.T) {
size := int(1e5)
ol, _ := createMultiPartList(t, size, false)
t.Logf("List parts %v", len(ol.plist.Splits))

kvs, err := ol.Rollup()
require.NoError(t, err)
Expand Down Expand Up @@ -1108,7 +1107,6 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
func TestMultiPartListDelete(t *testing.T) {
size := int(1e4)
ol, commits := createAndDeleteMultiPartList(t, size)
t.Logf("List parts %v", len(ol.plist.Splits))
require.Equal(t, size*2, commits)

counter := 0
Expand Down
28 changes: 28 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")
// ErrInvalidKey is returned when trying to read a posting list using
// an invalid key (e.g the key to a single part of a larger multi-part list).
ErrInvalidKey = errors.Errorf("cannot read posting list from this key")
)

// ShouldAbort returns whether the transaction should be aborted.
Expand Down Expand Up @@ -141,6 +144,20 @@ func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error {
// Use forward iterator with allversions enabled in iter options.
// key would now be owned by the posting list. So, ensure that it isn't reused elsewhere.
func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
// Previously, ReadPostingList was not checking that a multi-part list could only
// be read via the main key. This lead to issues during rollup because multi-part
// lists ended up being rolled-up multiple times. This issue was caught by the
// uid-set Jepsen test.
pk, err := x.Parse(key)
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key)
}
if pk.HasStartUid {
// Trying to read a single part of a multi part list. This type of list
// should be read once using the canonical list (with startUid equal to zero).
return nil, ErrInvalidKey
}

l := new(List)
l.key = key
l.plist = new(pb.PostingList)
Expand All @@ -166,6 +183,17 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
return nil, err
}
l.minTs = item.Version()

// If this list is a multi-part list, advance past the keys holding the parts.
if len(l.plist.GetSplits()) > 0 {
lastKey, err := x.GetSplitKey(key, math.MaxUint64)
if err != nil {
return nil, errors.Wrapf(err,
"while advancing past the end of multi-part list with key [%v]", key)
}
it.Seek(lastKey)
}

// No need to do Next here. The outer loop can take care of skipping
// more versions of the same key.
return l, nil
Expand Down