Skip to content

Commit

Permalink
minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshil Goel committed Sep 29, 2023
1 parent 84a1a5a commit d572cae
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 169 deletions.
6 changes: 3 additions & 3 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,10 @@ func TestReadSingleValue(t *testing.T) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
k, err := GetSingleValueForKey(key, uint64(j))
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
p := getFirst(t, k, uint64(j))
checkValue(t, ol, string(p.Value), uint64(j))
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}

}
Expand Down
128 changes: 37 additions & 91 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,64 +152,12 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
return updated
}

func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) {
getNewPlistNil := func() (*List, error) {
lc.RLock()
defer lc.RUnlock()
if lc.plists == nil {
pl, err := GetSingleValueForKey(key, lc.startTs)
return pl, err
}
return nil, nil
}

if l, err := getNewPlistNil(); l != nil || err != nil {
return l, err
}

skey := string(key)
if pl := lc.getNoStore(skey); pl != nil {
return pl, nil
}

var pl *List
var err error
if readFromDisk {
pl, err = GetSingleValueForKey(key, lc.startTs)
} else {
pl = &List{
key: key,
plist: new(pb.PostingList),
}
}

// If we just brought this posting list into memory and we already have a delta for it, let's
// apply it before returning the list.
lc.RLock()
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
pl.setMutation(lc.startTs, delta)
}
lc.RUnlock()
return pl, err
}

func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
getNewPlistNil := func() (*List, error) {
lc.RLock()
defer lc.RUnlock()
if lc.plists == nil {
if readFromDisk {
return getNew(key, pstore, lc.startTs)
} else {
pl := &List{
key: key,
plist: new(pb.PostingList),
}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
pl.setMutation(lc.startTs, delta)
}
return pl, nil
}
return getNew(key, pstore, lc.startTs)
}
return nil, nil
}
Expand Down Expand Up @@ -248,60 +196,58 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
}

func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
pl := &pb.PostingList{}
validatePl := func() {
i := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
pl = nil
return
}
if postings.Op != Del {
pl.Postings[i] = postings
i++

getPostings := func() (*pb.PostingList, error) {
pl := &pb.PostingList{}
lc.RLock()
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err != nil {
lc.RUnlock()
return pl, nil
}
}
pl.Postings = pl.Postings[:i]
}
lc.RLock()
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
lc.RUnlock()

txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
validatePl()
return pl, nil
return pl, err
}
} else {
lc.RUnlock()
}

txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
validatePl()
err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

return pl, err
}

err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
err = nil
}
if err != nil {
validatePl()
return pl, err
}

validatePl()
// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

func (lc *LocalCache) GetSingle(key []byte) (*List, error) {
return lc.getSingleInternal(key, true)
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
75 changes: 0 additions & 75 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,81 +457,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
return l, nil
}

func GetSingleValueForKey(key []byte, readTs uint64) (*List, error) {
cachedVal, ok := lCache.Get(key)
if ok {
l, ok := cachedVal.(*List)
if ok && l != nil {
// No need to clone the immutable layer or the key since mutations will not modify it.
lCopy := &List{
minTs: l.minTs,
maxTs: l.maxTs,
key: key,
plist: l.plist,
}
l.RLock()
if l.mutationMap != nil {
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
for ts, pl := range l.mutationMap {
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
}
}
l.RUnlock()
return lCopy, nil
}
}

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}

l := new(List)
l.key = key
l.plist = new(pb.PostingList)

txn := pstore.NewTransactionAt(readTs, false)
item, err := txn.Get(key)
if err != nil {
return l, err
}

l.maxTs = x.Max(l.maxTs, item.Version())

switch item.UserMeta() {
case BitEmptyPosting:
l.minTs = item.Version()
case BitCompletePosting:
if err := unmarshalOrCopy(l.plist, item); err != nil {
return l, nil
}
l.minTs = item.Version()

case BitDeltaPosting:
err := item.Value(func(val []byte) error {
pl := &pb.PostingList{}
if err := pl.Unmarshal(val); err != nil {
return err
}
pl.CommitTs = item.Version()
for _, mpost := range pl.Postings {
// commitTs, startTs are meant to be only in memory, not
// stored on disk.
mpost.CommitTs = item.Version()
}
if l.mutationMap == nil {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[pl.CommitTs] = pl
return nil
})
if err != nil {
return l, nil
}
}

return l, nil
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
cachedVal, ok := lCache.Get(key)
if ok {
Expand Down

0 comments on commit d572cae

Please sign in to comment.