From 219b04a84c80579089fbb3951128449fdf55c53c Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 15 Jul 2020 13:13:48 -0700 Subject: [PATCH 1/4] fix(Dgraph): update reverse index when updating single UID predicates. (#5868) When updating a single uid predicate with a reverse index, the existing entry in the reverse index should be deleted first. Fixes DGRAPH-1738 (cherry picked from commit 5b70fe8aa2c4dc08f74794b3e9875c8789c11c9f) --- posting/index.go | 52 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/posting/index.go b/posting/index.go index 084f4c08417..d40547217b2 100644 --- a/posting/index.go +++ b/posting/index.go @@ -190,8 +190,37 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro if err != nil { return err } + if plist == nil { + return errors.Errorf("nil posting list for reverse key %s", hex.Dump(key)) + } + + // For single uid predicates, updating the reverse index requires that the existing + // entries for this key in the index are removed. + pred, ok := schema.State().Get(ctx, t.Attr) + isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && + t.Op == pb.DirectedEdge_SET && t.ValueId != 0 + if isSingleUidUpdate { + dataKey := x.DataKey(t.Attr, t.Entity) + dataList, err := getFn(dataKey) + if err != nil { + return errors.Wrapf(err, "cannot find single uid list to update with key %s", + hex.Dump(dataKey)) + } + err = dataList.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { + delEdge := &pb.DirectedEdge{ + Entity: t.Entity, + ValueId: p.Uid, + Attr: t.Attr, + Op: pb.DirectedEdge_DEL, + } + return txn.addReverseAndCountMutation(ctx, delEdge) + }) + if err != nil { + return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s", + hex.Dump(dataKey)) + } + } - x.AssertTrue(plist != nil) // We must create a copy here. edge := &pb.DirectedEdge{ Entity: t.ValueId, @@ -212,6 +241,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro return err } } + return nil } @@ -391,8 +421,17 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, return l.handleDeleteAll(ctx, edge, txn) } - doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr) - hasCountIndex := schema.State().HasCount(edge.Attr) + doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr) + hasCountIndex := schema.State().HasCount(ctx, edge.Attr) + + // Add reverse mutation irrespective of hasMutated, server crash can happen after + // mutation is synced and before reverse edge is synced + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { + if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { + return err + } + } + val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge) if err != nil { return err @@ -430,13 +469,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, } } } - // Add reverse mutation irrespective of hasMutated, server crash can happen after - // mutation is synced and before reverse edge is synced - if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { - if err := txn.addReverseMutation(ctx, edge); err != nil { - return err - } - } return nil } From d657586f69eab8d33726f660af6b83ad2828452e Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Fri, 16 Oct 2020 11:35:35 -0700 Subject: [PATCH 2/4] Remove context arguments. --- posting/index.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/posting/index.go b/posting/index.go index d40547217b2..0fca0117ae6 100644 --- a/posting/index.go +++ b/posting/index.go @@ -196,7 +196,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro // For single uid predicates, updating the reverse index requires that the existing // entries for this key in the index are removed. - pred, ok := schema.State().Get(ctx, t.Attr) + pred, ok := schema.State().Get(t.Attr) isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && t.Op == pb.DirectedEdge_SET && t.ValueId != 0 if isSingleUidUpdate { @@ -213,7 +213,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro Attr: t.Attr, Op: pb.DirectedEdge_DEL, } - return txn.addReverseAndCountMutation(ctx, delEdge) + return txn.addReverseAndCountMutation(delEdge) }) if err != nil { return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s", @@ -421,12 +421,12 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, return l.handleDeleteAll(ctx, edge, txn) } - doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr) - hasCountIndex := schema.State().HasCount(ctx, edge.Attr) + doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr) + hasCountIndex := schema.State().HasCount(edge.Attr) // Add reverse mutation irrespective of hasMutated, server crash can happen after // mutation is synced and before reverse edge is synced - if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { return err } From d377b2f44267aebc73a61af0e701f42cdc2dc2ea Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Mon, 17 Feb 2020 16:29:03 +0200 Subject: [PATCH 3/4] Optimize computing reverse reindexing (#4755) We used to compute reverse count indexes twice while reindexing. First, while computing reverse edges, and then later while explicitly computing reverse count index. Now, we use a different function while reindexing reverse edges and do not compute the reverse count indexes there any more. --- dgraph/cmd/alpha/reindex_test.go | 68 ++++++++++++++++++++++++++++++++ posting/index.go | 47 ++++++++++++++++------ 2 files changed, 104 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/alpha/reindex_test.go b/dgraph/cmd/alpha/reindex_test.go index 0628e5a97d1..1043e2ff3fa 100644 --- a/dgraph/cmd/alpha/reindex_test.go +++ b/dgraph/cmd/alpha/reindex_test.go @@ -117,3 +117,71 @@ func TestReindexLang(t *testing.T) { } }`, res) } + +func TestReindexReverseCount(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(`value: [uid] .`)) + + m1 := `{ + set { + <1> <4> . + <1> <5> . + <1> <6> . + <1> <7> . + <1> <8> . + <2> <4> . + <2> <5> . + <2> <6> . + <3> <5> . + <3> <6> . + } + }` + _, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + + // reindex + require.NoError(t, alterSchema(`value: [uid] @count @reverse .`)) + + q1 := `{ + q(func: eq(count(~value), "3")) { + uid + } + }` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + require.JSONEq(t, `{ + "data": { + "q": [ + { + "uid": "0x5" + }, + { + "uid": "0x6" + } + ] + } + }`, res) + + // adding another triplet + m2 := `{ set { <9> <4> . }}` + _, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + + res, _, err = queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + require.JSONEq(t, `{ + "data": { + "q": [ + { + "uid": "0x4" + }, + { + "uid": "0x5" + }, + { + "uid": "0x6" + } + ] + } + }`, res) +} diff --git a/posting/index.go b/posting/index.go index 0fca0117ae6..a242e01b952 100644 --- a/posting/index.go +++ b/posting/index.go @@ -174,6 +174,30 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, } func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) error { + key := x.ReverseKey(t.Attr, t.ValueId) + plist, err := txn.GetFromDelta(key) + if err != nil { + return err + } + x.AssertTrue(plist != nil) + + // We must create a copy here. + edge := &pb.DirectedEdge{ + Entity: t.ValueId, + ValueId: t.Entity, + Attr: t.Attr, + Op: t.Op, + Facets: t.Facets, + } + if err := plist.addMutation(ctx, txn, edge); err != nil { + return err + } + + ostats.Record(ctx, x.NumEdges.M(1)) + return nil +} + +func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEdge) error { key := x.ReverseKey(t.Attr, t.ValueId) hasCountIndex := schema.State().HasCount(t.Attr) @@ -263,7 +287,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, case isReversed: // Delete reverse edge for each posting. delEdge.ValueId = p.Uid - return txn.addReverseMutation(ctx, delEdge) + return txn.addReverseAndCountMutation(ctx, delEdge) case isIndexed: // Delete index edge of each posting. val := types.Val{ @@ -314,7 +338,6 @@ func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count } ostats.Record(ctx, x.NumEdges.M(1)) return nil - } func (txn *Txn) updateCount(ctx context.Context, params countParams) error { @@ -323,9 +346,11 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error { Attr: params.attr, Op: pb.DirectedEdge_DEL, } - if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore), - params.reverse); err != nil { - return err + if params.countBefore > 0 { + if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore), + params.reverse); err != nil { + return err + } } if params.countAfter > 0 { @@ -563,13 +588,11 @@ func (r *rebuilder) Run(ctx context.Context) error { dbOpts := badger.DefaultOptions(tmpIndexDir). WithSyncWrites(false). WithNumVersionsToKeep(math.MaxInt64). + WithLogger(&x.ToGlog{}). WithCompression(options.None). WithLogRotatesToFlush(10). WithBlockCacheSize(50) // TODO(Aman): Disable cache altogether - // TODO(Ibrahim): Remove this once badger is updated. - dbOpts.ZSTDCompressionLevel = 1 - tmpDB, err := badger.OpenManaged(dbOpts) if err != nil { return errors.Wrap(err, "error opening temp badger for reindexing") @@ -587,9 +610,6 @@ func (r *rebuilder) Run(ctx context.Context) error { // Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter // could be replaced with WriteBatch in the code - // Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that - // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter - // could be replaced with WriteBatch in the code. tmpWriter := NewTxnWriter(tmpDB) stream := pstore.NewStreamAt(r.startTs) stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr) @@ -612,6 +632,9 @@ func (r *rebuilder) Run(ctx context.Context) error { return nil, errors.Wrapf(err, "error reading posting list from disk") } + // We are using different transactions in each call to KeyToList function. This could + // be a problem for computing reverse count indexes if deltas for same key are added + // in different transactions. Such a case doesn't occur for now. txn := NewTxn(r.startTs) if err := r.fn(pk.Uid, l, txn); err != nil { return nil, err @@ -1018,6 +1041,8 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { edge.Label = pp.Label for { + // we only need to build reverse index here. + // We will update the reverse count index separately. err := txn.addReverseMutation(ctx, &edge) switch err { case ErrRetry: From 38010b599bf267f69e680e76449684452799bf15 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 15 Jul 2020 13:13:48 -0700 Subject: [PATCH 4/4] fix(Dgraph): update reverse index when updating single UID predicates. (#5868) When updating a single uid predicate with a reverse index, the existing entry in the reverse index should be deleted first. Fixes DGRAPH-1738 --- posting/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posting/index.go b/posting/index.go index a242e01b952..bd919a3e821 100644 --- a/posting/index.go +++ b/posting/index.go @@ -237,7 +237,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd Attr: t.Attr, Op: pb.DirectedEdge_DEL, } - return txn.addReverseAndCountMutation(delEdge) + return txn.addReverseAndCountMutation(ctx, delEdge) }) if err != nil { return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s",