From 7afcb372c92af8d7e16aa825530afb95561d75a7 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 23 Jul 2019 12:53:37 -0700 Subject: [PATCH 1/4] Avoid retrieving data keys if we don't need them to execute the mutation. --- posting/oracle.go | 4 ++++ worker/mutation.go | 23 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/posting/oracle.go b/posting/oracle.go index fada9ec260f..83c0ded1fde 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -77,6 +77,10 @@ func (txn *Txn) Get(key []byte) (*List, error) { return txn.cache.Get(key) } +func (txn *Txn) GetFromDelta(key []byte) (*List, error) { + return txn.cache.GetFromDelta(key) +} + // Update calls UpdateDeltasAndDiscardLists on the local cache. func (txn *Txn) Update() { txn.cache.UpdateDeltasAndDiscardLists() diff --git a/worker/mutation.go b/worker/mutation.go index 41b335c5662..bd691d9a647 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -77,7 +77,28 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e t := time.Now() key := x.DataKey(edge.Attr, edge.Entity) - plist, err := txn.Get(key) + + var fn func(key []byte) (*posting.List, error) + + switch { + case len(su.GetTokenizer()) > 0 || su.GetCount(): + // Any index or count index. + fn = txn.Get + case su.GetValueType() == pb.Posting_UID && !su.GetList(): + // Single UID, not a list. + fn = txn.Get + case edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star: + // Delete all. To keep things simple, don't worry about whether indexed or not. + fn = txn.Get + default: + // Reverse index doesn't need the posting list to be read. We already covered count index, + // single uid and delete all above. + // Values, whether single or list, don't need to read. + // Uid list doesn't need to read. + fn = txn.GetFromDelta + } + plist, err := fn(key) + if dur := time.Since(t); dur > time.Millisecond { if span := otrace.FromContext(ctx); span != nil { span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-get", true)}, From 2d915223e4537a62a115aff6fe29eda79852ecf6 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 23 Jul 2019 14:13:47 -0700 Subject: [PATCH 2/4] Add comments to explain the logic. --- worker/mutation.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/worker/mutation.go b/worker/mutation.go index bd691d9a647..df0b842ae4c 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -75,11 +75,14 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e return err } - t := time.Now() key := x.DataKey(edge.Attr, edge.Entity) - + // The following is a performance optimization which allows us to not read a posting list from + // disk. We calculate this based on how AddMutationWithIndex works. General idea is that if + // we're not using the read posting list, we don't need to retrieve it. We need posting list if + // we're doing indexing or count index or enforcing single UID, etc. In other cases, we can just + // create a posting list facade in memory and use it to store the delta in Badger. Later, Rollup + // operation would consolidate all these deltas into a posting list. var fn func(key []byte) (*posting.List, error) - switch { case len(su.GetTokenizer()) > 0 || su.GetCount(): // Any index or count index. @@ -97,8 +100,9 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e // Uid list doesn't need to read. fn = txn.GetFromDelta } - plist, err := fn(key) + t := time.Now() + plist, err := fn(key) if dur := time.Since(t); dur > time.Millisecond { if span := otrace.FromContext(ctx); span != nil { span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-get", true)}, @@ -108,11 +112,7 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e if err != nil { return err } - - if err = plist.AddMutationWithIndex(ctx, edge, txn); err != nil { - return err // abort applying the rest of them. - } - return nil + return plist.AddMutationWithIndex(ctx, edge, txn) } // This is serialized with mutations, called after applied watermarks catch up From 89a079bed8f804577239e4eaa2f2862dc8e3b1bc Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 23 Jul 2019 14:35:20 -0700 Subject: [PATCH 3/4] Fix a test failure by retrieving in case of DEL operation. --- worker/mutation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/mutation.go b/worker/mutation.go index df0b842ae4c..598c1abe415 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -90,8 +90,8 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e case su.GetValueType() == pb.Posting_UID && !su.GetList(): // Single UID, not a list. fn = txn.Get - case edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star: - // Delete all. To keep things simple, don't worry about whether indexed or not. + case edge.Op == pb.DirectedEdge_DEL: + // Covers various delete cases to keep things simple. fn = txn.Get default: // Reverse index doesn't need the posting list to be read. We already covered count index, From 2db31922debb8a27a288d584957c65cec5a0e975 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 24 Jul 2019 10:23:46 -0700 Subject: [PATCH 4/4] Address Martin's comments --- posting/oracle.go | 1 + worker/mutation.go | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/posting/oracle.go b/posting/oracle.go index 83c0ded1fde..adc4045b075 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -77,6 +77,7 @@ func (txn *Txn) Get(key []byte) (*List, error) { return txn.cache.Get(key) } +// GetFromDelta retrieves the posting list from delta cache, not from Badger. func (txn *Txn) GetFromDelta(key []byte) (*List, error) { return txn.cache.GetFromDelta(key) } diff --git a/worker/mutation.go b/worker/mutation.go index 598c1abe415..63eb54fa2d2 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -77,32 +77,32 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e key := x.DataKey(edge.Attr, edge.Entity) // The following is a performance optimization which allows us to not read a posting list from - // disk. We calculate this based on how AddMutationWithIndex works. General idea is that if - // we're not using the read posting list, we don't need to retrieve it. We need posting list if - // we're doing indexing or count index or enforcing single UID, etc. In other cases, we can just - // create a posting list facade in memory and use it to store the delta in Badger. Later, Rollup - // operation would consolidate all these deltas into a posting list. - var fn func(key []byte) (*posting.List, error) + // disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if + // we're not using the read posting list, we don't need to retrieve it. We need the posting list + // if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can + // just create a posting list facade in memory and use it to store the delta in Badger. Later, + // the rollup operation would consolidate all these deltas into a posting list. + var getFn func(key []byte) (*posting.List, error) switch { case len(su.GetTokenizer()) > 0 || su.GetCount(): // Any index or count index. - fn = txn.Get + getFn = txn.Get case su.GetValueType() == pb.Posting_UID && !su.GetList(): // Single UID, not a list. - fn = txn.Get + getFn = txn.Get case edge.Op == pb.DirectedEdge_DEL: // Covers various delete cases to keep things simple. - fn = txn.Get + getFn = txn.Get default: // Reverse index doesn't need the posting list to be read. We already covered count index, // single uid and delete all above. - // Values, whether single or list, don't need to read. - // Uid list doesn't need to read. - fn = txn.GetFromDelta + // Values, whether single or list, don't need to be read. + // Uid list doesn't need to be read. + getFn = txn.GetFromDelta } t := time.Now() - plist, err := fn(key) + plist, err := getFn(key) if dur := time.Since(t); dur > time.Millisecond { if span := otrace.FromContext(ctx); span != nil { span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-get", true)},