From f8f7c4298ce4ee2f2983119c191f017b93f79fed Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Sat, 24 Oct 2020 03:30:41 +0530 Subject: [PATCH] fix(Ludicrous): Upserts on list type in Dgraph (#6796) Cherry-pick of the following into release/v20.07: * fix(Ludicrous): Upserts on list type in Dgraph (#6754) * fix(test): Add sleep before running ludicrous mode tests (#6766) Fixes DGRAPH-2446. Post mortem of issue: https://discuss.dgraph.io/t/dgraph-missing-mutations-in-ludicrous-mode-post-mortem/11019 --- systest/ludicrous/upsert_test.go | 175 +++++++++++++++++++++++++++++++ worker/draft.go | 3 +- 2 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 systest/ludicrous/upsert_test.go diff --git a/systest/ludicrous/upsert_test.go b/systest/ludicrous/upsert_test.go new file mode 100644 index 00000000000..129b695c883 --- /dev/null +++ b/systest/ludicrous/upsert_test.go @@ -0,0 +1,175 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "testing" + + "time" + + "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/testutil" + "github.com/stretchr/testify/require" +) + +type Person struct { + Name string `json:"name,omitempty"` + count int +} + +type Data struct { + Name string `json:"name,omitempty"` + Counts []int `json:"count,omitempty"` +} +type ResponseData struct { + All []Data `json:"all,omitempty"` +} + +func InitData(t *testing.T) { + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + testutil.DropAll(t, dg) + schema := ` + name: string @index(exact) . + count: [int] . + ` + + err = dg.Alter(context.Background(), &api.Operation{Schema: schema}) + require.NoError(t, err) + + p := Person{ + Name: "Alice", + count: 1, + } + pb, err := json.Marshal(p) + require.NoError(t, err) + + mu := &api.Mutation{ + SetJson: pb, + CommitNow: true, + } + txn := dg.NewTxn() + ctx := context.Background() + defer txn.Discard(ctx) + + _, err = txn.Mutate(ctx, mu) + require.NoError(t, err) +} + +func TestConcurrentUpdate(t *testing.T) { + // wait for server to be ready + time.Sleep(2 * time.Second) + InitData(t) + ctx := context.Background() + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + + count := 10 + var wg sync.WaitGroup + wg.Add(count) + mutation := func(i int) { + defer wg.Done() + query := `query { + user as var(func: eq(name, "Alice")) + }` + mu := &api.Mutation{ + SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), + } + req := &api.Request{ + Query: query, + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(ctx, req) + require.NoError(t, err) + } + for i := 0; i < count; i++ { + go mutation(i) + } + wg.Wait() + // eventual consistency + time.Sleep(2 * time.Second) + + q := `query all($a: string) { + all(func: eq(name, $a)) { + name + count + } + }` + + txn := dg.NewTxn() + res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) + require.NoError(t, err) + var dat ResponseData + err = json.Unmarshal(res.Json, &dat) + require.NoError(t, err) + + require.Equal(t, 10, len(dat.All[0].Counts)) +} + +func TestSequentialUpdate(t *testing.T) { + // wait for server to be ready + time.Sleep(2 * time.Second) + InitData(t) + ctx := context.Background() + dg, err := testutil.DgraphClient(testutil.SockAddr) + require.NoError(t, err) + + count := 10 + mutation := func(i int) { + query := `query { + user as var(func: eq(name, "Alice")) + }` + mu := &api.Mutation{ + SetNquads: []byte(fmt.Sprintf(`uid(user) "%d" .`, i)), + } + req := &api.Request{ + Query: query, + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + _, err := dg.NewTxn().Do(ctx, req) + require.NoError(t, err) + + } + for i := 0; i < count; i++ { + mutation(i) + } + + // eventual consistency + time.Sleep(1 * time.Second) + + q := `query all($a: string) { + all(func: eq(name, $a)) { + name + count + } + }` + + txn := dg.NewTxn() + res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"}) + require.NoError(t, err) + var dat ResponseData + err = json.Unmarshal(res.Json, &dat) + require.NoError(t, err) + + require.Equal(t, 10, len(dat.All[0].Counts)) +} diff --git a/worker/draft.go b/worker/draft.go index ef9fdf65aab..1c8eaed077b 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -661,7 +661,8 @@ func (n *node) processApplyCh() { psz := proposal.Size() totalSize += int64(psz) - if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil && proposal.Mutations.StartTs == 0 { + // In case of upserts the startTs would be > 0, so, no need to check startTs is 0 + if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil { proposal.Mutations.StartTs = State.GetTimestamp(false) }