From d697ca0898f0ac951fb041bfc517f77377e0cd28 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 23 Jul 2019 12:06:16 -0700 Subject: [PATCH] Don't read posting lists from disk when mutating indices. (#3695) Adding index mutations can be performed without reading the posting lists in disk. This change modifies the indexing process to avoid reading any list from disk. It should result in performance improvements, specially when trying to modify big index posting lists. --- posting/index.go | 8 ++++---- posting/lists.go | 32 +++++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/posting/index.go b/posting/index.go index efa4eccd1b5..36aa4a0b267 100644 --- a/posting/index.go +++ b/posting/index.go @@ -114,7 +114,7 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, token string) error { key := x.IndexKey(edge.Attr, token) - plist, err := txn.Get(key) + plist, err := txn.cache.GetFromDelta(key) if err != nil { return err } @@ -169,7 +169,7 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) error { key := x.ReverseKey(t.Attr, t.ValueId) - plist, err := txn.Get(key) + plist, err := txn.cache.GetFromDelta(key) if err != nil { return err } @@ -256,7 +256,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count uint32, reverse bool) error { key := x.CountKey(t.Attr, count, reverse) - plist, err := txn.Get(key) + plist, err := txn.cache.GetFromDelta(key) if err != nil { return err } @@ -940,7 +940,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { // Ensure that list is in the cache run by txn. Otherwise, nothing would // get updated. - txn.cache.SetIfAbsent(string(pl.key), pl) + pl = txn.cache.SetIfAbsent(string(pl.key), pl) if err := pl.addMutation(ctx, txn, t); err != nil { return err } diff --git a/posting/lists.go b/posting/lists.go index 973e254945a..cc803e25e5e 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -221,8 +221,7 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { return updated } -// Get retrieves the cached version of the list associated with the given key. -func (lc *LocalCache) Get(key []byte) (*List, error) { +func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { if lc == nil { return getNew(key, pstore) } @@ -231,10 +230,21 @@ func (lc *LocalCache) Get(key []byte) (*List, error) { return pl, nil } - pl, err := getNew(key, pstore) - if err != nil { - return nil, err + var pl *List + if readFromDisk { + var err error + pl, err = getNew(key, pstore) + if err != nil { + return nil, err + } + } else { + pl = &List{ + key: key, + mutationMap: make(map[uint64]*pb.PostingList), + plist: new(pb.PostingList), + } } + // If we just brought this posting list into memory and we already have a delta for it, let's // apply it before returning the list. lc.RLock() @@ -245,6 +255,18 @@ func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.SetIfAbsent(skey, pl), nil } +// Get retrieves the cached version of the list associated with the given key. +func (lc *LocalCache) Get(key []byte) (*List, error) { + return lc.getInternal(key, true) +} + +// GetFromDelta gets the cached version of the list without reading from disk +// and only applies the existing deltas. This is used in situations where the +// posting list will only be modified and not read (e.g adding index mutations). +func (lc *LocalCache) GetFromDelta(key []byte) (*List, error) { + return lc.getInternal(key, false) +} + // UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists. func (lc *LocalCache) UpdateDeltasAndDiscardLists() { lc.Lock()