From bcb3cc03aba6a1c44132bf710d32e67d60090b2c Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Wed, 13 Jan 2021 08:14:34 +0530 Subject: [PATCH 1/6] Check for deleteBelowTs in iterator valid --- posting/list.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/posting/list.go b/posting/list.go index 90881dd5b6f..3eac81fda43 100644 --- a/posting/list.go +++ b/posting/list.go @@ -245,6 +245,10 @@ func (it *pIterator) next() error { } func (it *pIterator) valid() (bool, error) { + if it.deleteBelowTs > 0 { + return false, nil + } + if len(it.uids) > 0 { return true, nil } From 5217509f9824528ee23991c55cbb85bdb461fcc2 Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Wed, 13 Jan 2021 18:16:44 +0530 Subject: [PATCH 2/6] nil it.uids --- posting/list.go | 1 + 1 file changed, 1 insertion(+) diff --git a/posting/list.go b/posting/list.go index 3eac81fda43..9fe622140c4 100644 --- a/posting/list.go +++ b/posting/list.go @@ -246,6 +246,7 @@ func (it *pIterator) next() error { func (it *pIterator) valid() (bool, error) { if it.deleteBelowTs > 0 { + it.uids = nil return false, nil } From 7b3efdeb816fd9d3ecba7ab8f530b118af94d4ab Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Thu, 14 Jan 2021 19:37:26 +0530 Subject: [PATCH 3/6] Add systest --- systest/delete/delete_test.go | 136 ++++++++++++++++++++++++++++++ systest/delete/docker-compose.yml | 43 ++++++++++ 2 files changed, 179 insertions(+) create mode 100644 systest/delete/delete_test.go create mode 100644 systest/delete/docker-compose.yml diff --git a/systest/delete/delete_test.go b/systest/delete/delete_test.go new file mode 100644 index 00000000000..0dc31429414 --- /dev/null +++ b/systest/delete/delete_test.go @@ -0,0 +1,136 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "testing" + + "github.com/dgraph-io/dgo/v200" + "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/testutil" + "github.com/stretchr/testify/require" +) + +// TestDeleteEdges inserts a large number of edges into a node, +// and attempts to delete all of them at once. If deletion is +// successful, the new number of edges will be 0. +func TestDeleteEdges(t *testing.T) { + const numEdges = 1000000 + const numBatches = 50 + const uid = "0x1" + + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + + // Batched inserts, since single large inserts are slow. + for i := 0; i < numBatches; i++ { + err = doInsert(dg, uid, numEdges/numBatches) + require.NoError(t, err) + } + + count, err := doQuery(dg, uid) + require.NoError(t, err) + require.Equal(t, numEdges, count) + + err = doDelete(dg, uid) + require.NoError(t, err) + + count, err = doQuery(dg, uid) + require.NoError(t, err) + require.Equal(t, count, 0) +} + +func doInsert(dg *dgo.Dgraph, uid string, numEdges int) error { + log.Printf("inserting %d follows for uid %s", numEdges, uid) + + var setNQuads []byte + for i := 0; i < numEdges; i++ { + nQuad := fmt.Sprintf("<%s> _:f%d .\n", uid, i) + setNQuads = append(setNQuads, []byte(nQuad)...) + } + mutation := &api.Mutation{ + SetNquads: setNQuads, + } + + request := &api.Request{ + Mutations: []*api.Mutation{mutation}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(context.Background(), request) + if err != nil { + return err + } + + return nil +} + +func doQuery(dg *dgo.Dgraph, uid string) (int, error) { + log.Printf("querying follows for uid %s", uid) + + query := fmt.Sprintf(`{ + q(func: uid(%s)) { + count: count(follow) + } + }`, uid) + + response, err := dg.NewReadOnlyTxn().Query(context.Background(), query) + if err != nil { + return 0, err + } + log.Printf("response: %s", string(response.Json)) + + var responseJSON struct { + Q []struct { + Count int + } + } + if err := json.Unmarshal(response.Json, &responseJSON); err != nil { + return 0, err + } + + switch len(responseJSON.Q) { + case 0: + return 0, nil + case 1: + return responseJSON.Q[0].Count, nil + default: + return 0, fmt.Errorf("unexpected number of results: %d", len(responseJSON.Q)) + } +} + +func doDelete(dg *dgo.Dgraph, uid string) error { + log.Printf("deleting follows for uid %s", uid) + + delNQuad := fmt.Sprintf("<%s> * .", uid) + mutation := &api.Mutation{ + DelNquads: []byte(delNQuad), + } + request := &api.Request{ + Mutations: []*api.Mutation{mutation}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(context.Background(), request) + if err != nil { + return err + } + + return nil +} diff --git a/systest/delete/docker-compose.yml b/systest/delete/docker-compose.yml new file mode 100644 index 00000000000..4117659893f --- /dev/null +++ b/systest/delete/docker-compose.yml @@ -0,0 +1,43 @@ +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:latest + container_name: alpha1 + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - 8080:8080 + - 9080:9080 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr + -v=2 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + deploy: + resources: + limits: + memory: 32G + zero1: + image: dgraph/dgraph:latest + container_name: zero1 + working_dir: /data/zero1 + labels: + cluster: test + ports: + - 5080:5080 + - 6080:6080 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + command: /gobin/dgraph zero --idx=1 --my=zero1:5080 --logtostderr -v=2 + --bindall + deploy: + resources: + limits: + memory: 32G +volumes: {} From 1f0a90c41d711d76f529fd51fa385ecedad5ae73 Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Thu, 14 Jan 2021 23:06:26 +0530 Subject: [PATCH 4/6] Add comments / simplify code --- posting/list.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/posting/list.go b/posting/list.go index 9fe622140c4..068d4505d06 100644 --- a/posting/list.go +++ b/posting/list.go @@ -114,7 +114,7 @@ type pIterator struct { deleteBelowTs uint64 } -func (it *pIterator) init(l *List, afterUid, deleteBelowTs uint64) error { +func (it *pIterator) seek(l *List, afterUid, deleteBelowTs uint64) error { if deleteBelowTs > 0 && deleteBelowTs <= l.minTs { return errors.Errorf("deleteBelowTs (%d) must be greater than the minTs in the list (%d)", deleteBelowTs, l.minTs) @@ -209,24 +209,19 @@ func (it *pIterator) moveToNextValidPart() error { return nil } - // If there are no more UIDs to iterate over, move to the next part of the - // list that contains valid data. - if len(it.uids) == 0 { - for it.splitIdx <= len(it.l.plist.Splits)-2 { - // moveToNextPart will increment it.splitIdx. Therefore, the for loop must only - // continue until len(splits) - 2. - if err := it.moveToNextPart(); err != nil { - return err - } - - if len(it.uids) > 0 { - return nil - } + // Iterate while there are no UIDs, and while we have more splits to iterate over. + for len(it.uids) == 0 && it.splitIdx < len(it.l.plist.Splits)-1 { + // moveToNextPart will increment it.splitIdx. Therefore, the for loop must only + // continue until len(splits)-1. + if err := it.moveToNextPart(); err != nil { + return err } } + return nil } +// next advances pIterator to the next valid part. func (it *pIterator) next() error { if it.deleteBelowTs > 0 { it.uids = nil @@ -244,6 +239,8 @@ func (it *pIterator) next() error { hex.EncodeToString(it.l.key)) } +// valid asserts that pIterator has valid uids, or advances it to the next valid part. +// It returns false if there are no more valid parts. func (it *pIterator) valid() (bool, error) { if it.deleteBelowTs > 0 { it.uids = nil @@ -571,7 +568,8 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.Unlock() } -// Iterate will allow you to iterate over this posting List, while having acquired a read lock. +// Iterate will allow you to iterate over the mutable and immutable layers of +// this posting List, while having acquired a read lock. // So, please keep this iteration cheap, otherwise mutations would get stuck. // The iteration will start after the provided UID. The results would not include this uid. // The function will loop until either the posting List is fully iterated, or you return a false @@ -654,6 +652,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { l.AssertRLock() + // mposts is the list of mutable postings deleteBelowTs, mposts := l.pickPostings(readTs) if readTs < l.minTs { return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key) @@ -673,7 +672,9 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e prevUid uint64 err error ) - err = pitr.init(l, afterUid, deleteBelowTs) + + // pitr iterates through immutable postings + err = pitr.seek(l, afterUid, deleteBelowTs) if err != nil { return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate") } @@ -1438,6 +1439,7 @@ func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, return fcs, nil } +// readListPart reads one split of a posting list from Badger. func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) { key, err := x.SplitKey(l.key, startUid) if err != nil { From 40a35f761e219b07967169ae836dbf90289cfa9a Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Thu, 14 Jan 2021 23:21:06 +0530 Subject: [PATCH 5/6] Add sleep before running systest --- systest/delete/delete_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/systest/delete/delete_test.go b/systest/delete/delete_test.go index 0dc31429414..d817a352005 100644 --- a/systest/delete/delete_test.go +++ b/systest/delete/delete_test.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "testing" + "time" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" @@ -37,6 +38,9 @@ func TestDeleteEdges(t *testing.T) { const numBatches = 50 const uid = "0x1" + // Wait for the cluster to come up. + time.Sleep(5 * time.Second) + dg, err := testutil.DgraphClient(testutil.SockAddr) require.NoError(t, err) From d49f62045b0b8c7be5e3468413fe1bd0129ff8b0 Mon Sep 17 00:00:00 2001 From: Ajeet D'Souza <98ajeet@gmail.com> Date: Fri, 15 Jan 2021 14:53:11 +0530 Subject: [PATCH 6/6] Change systest to list test --- posting/list_test.go | 58 ++++++++++--- systest/delete/delete_test.go | 140 ------------------------------ systest/delete/docker-compose.yml | 43 --------- 3 files changed, 45 insertions(+), 196 deletions(-) delete mode 100644 systest/delete/delete_test.go delete mode 100644 systest/delete/docker-compose.yml diff --git a/posting/list_test.go b/posting/list_test.go index a7c06d1655e..14488469dbc 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -36,6 +36,10 @@ import ( "github.com/dgraph-io/dgraph/x" ) +func setMaxListSize(newMaxListSize int) { + maxListSize = newMaxListSize +} + func (l *List) PostingList() *pb.PostingList { l.RLock() defer l.RUnlock() @@ -451,6 +455,7 @@ func TestAddMutation_mrjn1(t *testing.T) { func TestMillion(t *testing.T) { // Ensure list is stored in a single part. + defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 key := x.DataKey("bal", 1331) @@ -907,10 +912,8 @@ func verifySplits(t *testing.T, splits []uint64) { func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) @@ -955,10 +958,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { // For testing, set the max list size to a lower threshold. - maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() + defer setMaxListSize(maxListSize) + maxListSize = 10000 key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) @@ -1007,6 +1008,41 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { return ol, commits } +func TestDeleteStarMultiPartList(t *testing.T) { + numEdges := 10000 + + list, _ := createMultiPartList(t, numEdges, false) + parsedKey, err := x.Parse(list.key) + require.NoError(t, err) + + validateCount := func(expected int) { + count := 0 + list.Iterate(math.MaxUint64, 0, func(posting *pb.Posting) error { + count++ + return nil + }) + require.Equal(t, expected, count) + } + validateCount(numEdges) + + readTs := list.maxTs + 1 + commitTs := readTs + 1 + + txn := NewTxn(readTs) + edge := &pb.DirectedEdge{ + ValueId: parsedKey.Uid, + Attr: parsedKey.Attr, + Value: []byte(x.Star), + Op: pb.DirectedEdge_DEL, + } + err = list.addMutation(context.Background(), txn, edge) + require.NoError(t, err) + + err = list.commitMutation(readTs, commitTs) + require.NoError(t, err) + validateCount(0) +} + func writePostingListToDisk(kvs []*bpb.KV) error { writer := NewTxnWriter(pstore) for _, kv := range kvs { @@ -1147,10 +1183,8 @@ func TestMultiPartListDelete(t *testing.T) { func TestMultiPartListDeleteAndAdd(t *testing.T) { size := int(1e5) // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() // Add entries to the maps. key := x.DataKey(uuid.New().String(), 1331) @@ -1285,10 +1319,8 @@ func TestSingleListRollup(t *testing.T) { func TestRecursiveSplits(t *testing.T) { // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = mb / 2 - defer func() { - maxListSize = math.MaxInt32 - }() // Create a list that should be split recursively. size := int(1e5) diff --git a/systest/delete/delete_test.go b/systest/delete/delete_test.go deleted file mode 100644 index d817a352005..00000000000 --- a/systest/delete/delete_test.go +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 2020 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "encoding/json" - "fmt" - "log" - "testing" - "time" - - "github.com/dgraph-io/dgo/v200" - "github.com/dgraph-io/dgo/v200/protos/api" - "github.com/dgraph-io/dgraph/testutil" - "github.com/stretchr/testify/require" -) - -// TestDeleteEdges inserts a large number of edges into a node, -// and attempts to delete all of them at once. If deletion is -// successful, the new number of edges will be 0. -func TestDeleteEdges(t *testing.T) { - const numEdges = 1000000 - const numBatches = 50 - const uid = "0x1" - - // Wait for the cluster to come up. - time.Sleep(5 * time.Second) - - dg, err := testutil.DgraphClient(testutil.SockAddr) - require.NoError(t, err) - - // Batched inserts, since single large inserts are slow. - for i := 0; i < numBatches; i++ { - err = doInsert(dg, uid, numEdges/numBatches) - require.NoError(t, err) - } - - count, err := doQuery(dg, uid) - require.NoError(t, err) - require.Equal(t, numEdges, count) - - err = doDelete(dg, uid) - require.NoError(t, err) - - count, err = doQuery(dg, uid) - require.NoError(t, err) - require.Equal(t, count, 0) -} - -func doInsert(dg *dgo.Dgraph, uid string, numEdges int) error { - log.Printf("inserting %d follows for uid %s", numEdges, uid) - - var setNQuads []byte - for i := 0; i < numEdges; i++ { - nQuad := fmt.Sprintf("<%s> _:f%d .\n", uid, i) - setNQuads = append(setNQuads, []byte(nQuad)...) - } - mutation := &api.Mutation{ - SetNquads: setNQuads, - } - - request := &api.Request{ - Mutations: []*api.Mutation{mutation}, - CommitNow: true, - } - _, err := dg.NewTxn().Do(context.Background(), request) - if err != nil { - return err - } - - return nil -} - -func doQuery(dg *dgo.Dgraph, uid string) (int, error) { - log.Printf("querying follows for uid %s", uid) - - query := fmt.Sprintf(`{ - q(func: uid(%s)) { - count: count(follow) - } - }`, uid) - - response, err := dg.NewReadOnlyTxn().Query(context.Background(), query) - if err != nil { - return 0, err - } - log.Printf("response: %s", string(response.Json)) - - var responseJSON struct { - Q []struct { - Count int - } - } - if err := json.Unmarshal(response.Json, &responseJSON); err != nil { - return 0, err - } - - switch len(responseJSON.Q) { - case 0: - return 0, nil - case 1: - return responseJSON.Q[0].Count, nil - default: - return 0, fmt.Errorf("unexpected number of results: %d", len(responseJSON.Q)) - } -} - -func doDelete(dg *dgo.Dgraph, uid string) error { - log.Printf("deleting follows for uid %s", uid) - - delNQuad := fmt.Sprintf("<%s> * .", uid) - mutation := &api.Mutation{ - DelNquads: []byte(delNQuad), - } - request := &api.Request{ - Mutations: []*api.Mutation{mutation}, - CommitNow: true, - } - _, err := dg.NewTxn().Do(context.Background(), request) - if err != nil { - return err - } - - return nil -} diff --git a/systest/delete/docker-compose.yml b/systest/delete/docker-compose.yml deleted file mode 100644 index 4117659893f..00000000000 --- a/systest/delete/docker-compose.yml +++ /dev/null @@ -1,43 +0,0 @@ -version: "3.5" -services: - alpha1: - image: dgraph/dgraph:latest - container_name: alpha1 - working_dir: /data/alpha1 - labels: - cluster: test - ports: - - 8080:8080 - - 9080:9080 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true - command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr - -v=2 --whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 - deploy: - resources: - limits: - memory: 32G - zero1: - image: dgraph/dgraph:latest - container_name: zero1 - working_dir: /data/zero1 - labels: - cluster: test - ports: - - 5080:5080 - - 6080:6080 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true - command: /gobin/dgraph zero --idx=1 --my=zero1:5080 --logtostderr -v=2 - --bindall - deploy: - resources: - limits: - memory: 32G -volumes: {}