Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass read timestamp to getNew. #5085

Merged
merged 3 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestTokensTable(t *testing.T) {
require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1))

key := x.DataKey("name", 1)
l, err := getNew(key, ps)
l, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

edge := &pb.DirectedEdge{
Expand Down Expand Up @@ -236,7 +236,7 @@ func addEdgeToValue(t *testing.T, attr string, src uint64,
Entity: src,
Op: pb.DirectedEdge_SET,
}
l, err := GetNoStore(x.DataKey(attr, src))
l, err := GetNoStore(x.DataKey(attr, src), startTs)
require.NoError(t, err)
// No index entries added here as we do not call AddMutationWithIndex.
addMutation(t, l, edge, Set, startTs, commitTs, false)
Expand All @@ -252,7 +252,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,
Entity: src,
Op: pb.DirectedEdge_SET,
}
l, err := GetNoStore(x.DataKey(attr, src))
l, err := GetNoStore(x.DataKey(attr, src), startTs)
require.NoError(t, err)
// No index entries added here as we do not call AddMutationWithIndex.
addMutation(t, l, edge, Set, startTs, commitTs, false)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestRebuildTokIndex(t *testing.T) {
continue
}
idxKeys = append(idxKeys, string(key))
l, err := GetNoStore(key)
l, err := GetNoStore(key, 6)
require.NoError(t, err)
idxVals = append(idxVals, l)
}
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestRebuildTokIndexWithDeletion(t *testing.T) {
continue
}
idxKeys = append(idxKeys, string(key))
l, err := GetNoStore(key)
l, err := GetNoStore(key, 7)
require.NoError(t, err)
idxVals = append(idxVals, l)
}
Expand Down
68 changes: 34 additions & 34 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func checkValue(t *testing.T, ol *List, val string, readTs uint64) {
// TODO(txn): Add tests after lru eviction
func TestAddMutation_Value(t *testing.T) {
key := x.DataKey("value", 10)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
edge := &pb.DirectedEdge{
Value: []byte("oh hey there"),
Expand All @@ -194,7 +194,7 @@ func TestAddMutation_Value(t *testing.T) {

func TestAddMutation_jchiu1(t *testing.T) {
key := x.DataKey("value", 12)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestAddMutation_jchiu1(t *testing.T) {

func TestAddMutation_DelSet(t *testing.T) {
key := x.DataKey("value", 1534)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// DO sp*, don't commit
Expand All @@ -267,7 +267,7 @@ func TestAddMutation_DelSet(t *testing.T) {

func TestAddMutation_DelRead(t *testing.T) {
key := x.DataKey("value", 1543)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Set value to newcars, and commit it
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestAddMutation_DelRead(t *testing.T) {

func TestAddMutation_jchiu2(t *testing.T) {
key := x.DataKey("value", 15)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Del a value cars and but don't merge.
Expand All @@ -330,7 +330,7 @@ func TestAddMutation_jchiu2(t *testing.T) {

func TestAddMutation_jchiu2_Commit(t *testing.T) {
key := x.DataKey("value", 16)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Del a value cars and but don't merge.
Expand All @@ -357,7 +357,7 @@ func TestAddMutation_jchiu2_Commit(t *testing.T) {

func TestAddMutation_jchiu3(t *testing.T) {
key := x.DataKey("value", 29)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestAddMutation_jchiu3(t *testing.T) {

func TestAddMutation_mrjn1(t *testing.T) {
key := x.DataKey("value", 21)
ol, err := GetNoStore(key)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

// Set a value cars and merge.
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestMillion(t *testing.T) {
maxListSize = math.MaxInt32

key := x.DataKey("bal", 1331)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
var commits int
N := int(1e6)
Expand All @@ -472,7 +472,7 @@ func TestMillion(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand All @@ -492,7 +492,7 @@ func TestMillion(t *testing.T) {
func TestAddMutation_mrjn2(t *testing.T) {
ctx := context.Background()
key := x.DataKey("bal", 1001)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
var readTs uint64
for readTs = 1; readTs < 10; readTs++ {
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestAddMutation_mrjn2(t *testing.T) {

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

{
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestAddMutation_gru(t *testing.T) {

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

{
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

{
Expand Down Expand Up @@ -689,7 +689,7 @@ func TestAddAndDelMutation(t *testing.T) {

func TestAfterUIDCount(t *testing.T) {
key := x.DataKey("value", 22)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
// Set value to cars and merge to BadgerDB.
edge := &pb.DirectedEdge{
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestAfterUIDCount(t *testing.T) {

func TestAfterUIDCount2(t *testing.T) {
key := x.DataKey("value", 23)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand Down Expand Up @@ -793,7 +793,7 @@ func TestAfterUIDCount2(t *testing.T) {

func TestDelete(t *testing.T) {
key := x.DataKey("value", 25)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand All @@ -817,7 +817,7 @@ func TestDelete(t *testing.T) {

func TestAfterUIDCountWithCommit(t *testing.T) {
key := x.DataKey("value", 26)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand Down Expand Up @@ -903,7 +903,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
}()

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
for i := 1; i <= size; i++ {
Expand All @@ -921,7 +921,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand All @@ -933,7 +933,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
}
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 0)

Expand All @@ -948,7 +948,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
}()

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
for i := 1; i <= size; i++ {
Expand All @@ -963,7 +963,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand All @@ -983,7 +983,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
require.Equal(t, len(kvs), len(originalList.plist.Splits)+1)

require.NoError(t, writePostingListToDisk(kvs))
newList, err := getNew(kvs[0].Key, ps)
newList, err := getNew(kvs[0].Key, ps, math.MaxUint64)
require.NoError(t, err)

opt := ListOptions{ReadTs: uint64(size) + 1}
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
edge := &pb.DirectedEdge{
Expand All @@ -1154,7 +1154,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
}
Expand All @@ -1181,7 +1181,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
}
Expand All @@ -1190,7 +1190,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version)
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
}
Expand All @@ -1227,7 +1227,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err = ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

// Verify all entries are once again in the list.
Expand Down Expand Up @@ -1280,7 +1280,7 @@ func TestRecursiveSplits(t *testing.T) {
// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
for i := 1; i <= size; i++ {
Expand All @@ -1302,7 +1302,7 @@ func TestRecursiveSplits(t *testing.T) {
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 2)

Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestMain(m *testing.M) {

func BenchmarkAddMutations(b *testing.B) {
key := x.DataKey("name", 1)
l, err := getNew(key, ps)
l, err := getNew(key, ps, math.MaxUint64)
if err != nil {
b.Error(err)
}
Expand Down
8 changes: 4 additions & 4 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func Cleanup() {

// GetNoStore returns the list stored in the key or creates a new one if it doesn't exist.
// It does not store the list in any cache.
func GetNoStore(key []byte) (rlist *List, err error) {
return getNew(key, pstore)
func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) {
return getNew(key, pstore, readTs)
}

// LocalCache stores a cache of posting lists and deltas.
Expand Down Expand Up @@ -205,7 +205,7 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {

func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
if lc == nil {
return getNew(key, pstore)
return getNew(key, pstore, lc.startTs)
}
skey := string(key)
if pl := lc.getNoStore(skey); pl != nil {
Expand All @@ -215,7 +215,7 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
var pl *List
if readFromDisk {
var err error
pl, err = getNew(key, pstore)
pl, err = getNew(key, pstore, lc.startTs)
if err != nil {
return nil, err
}
Expand Down
Loading