From 0fab55250afe995a0f38c6184ddc37b82fad9a2a Mon Sep 17 00:00:00 2001 From: rahulgurnani Date: Thu, 15 Oct 2020 12:58:02 +0530 Subject: [PATCH 1/4] Fix DGRAPH-2446: Dgraph missing upserts in ludicrous mode --- worker/draft.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/worker/draft.go b/worker/draft.go index 6836792f421..bdbe54cdef7 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -679,7 +679,8 @@ func (n *node) processApplyCh() { psz := proposal.Size() totalSize += int64(psz) - if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil && proposal.Mutations.StartTs == 0 { + // In case of upserts the startTs would be > 0, so, no need to check startTs is 0 + if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil { proposal.Mutations.StartTs = State.GetTimestamp(false) } From b2104bf99f70e516bd2f726cc67f1ac581255598 Mon Sep 17 00:00:00 2001 From: rahulgurnani Date: Thu, 15 Oct 2020 16:49:10 +0530 Subject: [PATCH 2/4] Add tests --- systest/ludicrous/upsert_test.go | 152 +++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 systest/ludicrous/upsert_test.go diff --git a/systest/ludicrous/upsert_test.go b/systest/ludicrous/upsert_test.go new file mode 100644 index 00000000000..a13e5577bc5 --- /dev/null +++ b/systest/ludicrous/upsert_test.go @@ -0,0 +1,152 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "testing" + + "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/testutil" + "github.com/stretchr/testify/require" +) + +type Person struct { + Name string `json:"name,omitempty"` + email string + count int +} + +type Data struct { + Name string `json:"name,omitempty"` + Counts []int `json:"count,omitempty"` +} +type ResponseData struct { + All []Data `json:"all,omitempty"` +} + +func InitData(t *testing.T) { + dg, err := testutil.DgraphClient(testutil.SockAddr) + testutil.DropAll(t, dg) + require.NoError(t, err) + schema := ` + name: string @index(exact) . + email: string @index(exact) . + count: [int] . + ` + + err = dg.Alter(context.Background(), &api.Operation{Schema: schema}) + require.NoError(t, err) + + p := Person{ + Name: "Alice", + email: "alice@dgraph.io", + count: 1, + } + pb, err := json.Marshal(p) + require.NoError(t, err) + + mu := &api.Mutation{ + SetJson: pb, + CommitNow: true, + } + txn := dg.NewTxn() + ctx := context.Background() + defer txn.Discard(ctx) + + _, err = txn.Mutate(ctx, mu) + require.NoError(t, err) +} + +func TestConcurrentUpdate(t *testing.T) { + InitData(t) + ctx := context.Background() + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + + count := 10 + var wg sync.WaitGroup + wg.Add(count) + mutation := func(i int) { + defer wg.Done() + query := fmt.Sprintf(`query { + user as var(func: eq(name, "Alice")) + }`) + mu := &api.Mutation{ + SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), + } + req := &api.Request{ + Query: query, + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(ctx, req) + require.NoError(t, err) + } + for i := 0; i < count; i++ { + go mutation(i) + } + wg.Wait() + + q := `query all($a: string) { + all(func: eq(name, $a)) { + name + count + } + }` + + txn := dg.NewTxn() + res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) + require.NoError(t, err) + var dat ResponseData + err = json.Unmarshal(res.Json, &dat) + require.NoError(t, err) + + require.Equal(t, len(dat.All[0].Counts), 10) +} + +func TestSequentialUpdate(t *testing.T) { + t.Log("TestSequentialUpdate") + InitData(t) + ctx := context.Background() + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + + count := 10 + mutation := func(i int) { + query := fmt.Sprintf(`query { + user as var(func: eq(name, "Alice")) + }`) + mu := &api.Mutation{ + SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), + } + req := &api.Request{ + Query: query, + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(ctx, req) + require.NoError(t, err) + + } + for i := 0; i < count; i++ { + mutation(i) + } + + q := `query all($a: string) { + all(func: eq(name, $a)) { + name + count + } + }` + + txn := dg.NewTxn() + res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) + require.NoError(t, err) + var dat ResponseData + err = json.Unmarshal(res.Json, &dat) + require.NoError(t, err) + + require.Equal(t, len(dat.All[0].Counts), 10) +} From f74c9d74db21e4abc8b494b17c7876b9db754cc5 Mon Sep 17 00:00:00 2001 From: rahulgurnani Date: Mon, 19 Oct 2020 17:21:11 +0530 Subject: [PATCH 3/4] Address review comments --- systest/ludicrous/upsert_test.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/systest/ludicrous/upsert_test.go b/systest/ludicrous/upsert_test.go index a13e5577bc5..2e2d86c68bd 100644 --- a/systest/ludicrous/upsert_test.go +++ b/systest/ludicrous/upsert_test.go @@ -1,4 +1,19 @@ -package worker +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package main import ( "context" @@ -14,7 +29,6 @@ import ( type Person struct { Name string `json:"name,omitempty"` - email string count int } @@ -28,11 +42,10 @@ type ResponseData struct { func InitData(t *testing.T) { dg, err := testutil.DgraphClient(testutil.SockAddr) - testutil.DropAll(t, dg) require.NoError(t, err) + testutil.DropAll(t, dg) schema := ` name: string @index(exact) . - email: string @index(exact) . count: [int] . ` @@ -41,7 +54,6 @@ func InitData(t *testing.T) { p := Person{ Name: "Alice", - email: "alice@dgraph.io", count: 1, } pb, err := json.Marshal(p) @@ -70,9 +82,9 @@ func TestConcurrentUpdate(t *testing.T) { wg.Add(count) mutation := func(i int) { defer wg.Done() - query := fmt.Sprintf(`query { + query := `query { user as var(func: eq(name, "Alice")) - }`) + }` mu := &api.Mutation{ SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), } @@ -107,7 +119,6 @@ func TestConcurrentUpdate(t *testing.T) { } func TestSequentialUpdate(t *testing.T) { - t.Log("TestSequentialUpdate") InitData(t) ctx := context.Background() dg, err := testutil.DgraphClient(testutil.SockAddr) @@ -115,9 +126,9 @@ func TestSequentialUpdate(t *testing.T) { count := 10 mutation := func(i int) { - query := fmt.Sprintf(`query { + query := `query { user as var(func: eq(name, "Alice")) - }`) + }` mu := &api.Mutation{ SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), } From 541ce36ec8f88fd14b7b00f06b5366004cccd8d1 Mon Sep 17 00:00:00 2001 From: rahulgurnani Date: Mon, 19 Oct 2020 21:00:41 +0530 Subject: [PATCH 4/4] Add a space --- systest/ludicrous/upsert_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/systest/ludicrous/upsert_test.go b/systest/ludicrous/upsert_test.go index 2e2d86c68bd..d10cadd43d4 100644 --- a/systest/ludicrous/upsert_test.go +++ b/systest/ludicrous/upsert_test.go @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package main import (