Skip to content

Commit

Permalink
fix(Ludicrous): Upserts on list type in Dgraph (#6754)
Browse files Browse the repository at this point in the history
* Fix DGRAPH-2446: Dgraph missing upserts in ludicrous mode

* Add tests

* Address review comments

* Add a space
  • Loading branch information
rahulgurnani committed Oct 23, 2020
1 parent 6da95dc commit edae1c6
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 1 deletion.
164 changes: 164 additions & 0 deletions systest/ludicrous/upsert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"

"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"
)

type Person struct {
Name string `json:"name,omitempty"`
count int
}

type Data struct {
Name string `json:"name,omitempty"`
Counts []int `json:"count,omitempty"`
}
type ResponseData struct {
All []Data `json:"all,omitempty"`
}

func InitData(t *testing.T) {
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
testutil.DropAll(t, dg)
schema := `
name: string @index(exact) .
count: [int] .
`

err = dg.Alter(context.Background(), &api.Operation{Schema: schema})
require.NoError(t, err)

p := Person{
Name: "Alice",
count: 1,
}
pb, err := json.Marshal(p)
require.NoError(t, err)

mu := &api.Mutation{
SetJson: pb,
CommitNow: true,
}
txn := dg.NewTxn()
ctx := context.Background()
defer txn.Discard(ctx)

_, err = txn.Mutate(ctx, mu)
require.NoError(t, err)
}

func TestConcurrentUpdate(t *testing.T) {
InitData(t)
ctx := context.Background()
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)

count := 10
var wg sync.WaitGroup
wg.Add(count)
mutation := func(i int) {
defer wg.Done()
query := `query {
user as var(func: eq(name, "Alice"))
}`
mu := &api.Mutation{
SetNquads: []byte(fmt.Sprintf(`uid(user) <count> "%d" .`, i)),
}
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
_, err := dg.NewTxn().Do(ctx, req)
require.NoError(t, err)
}
for i := 0; i < count; i++ {
go mutation(i)
}
wg.Wait()

q := `query all($a: string) {
all(func: eq(name, $a)) {
name
count
}
}`

txn := dg.NewTxn()
res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"})
require.NoError(t, err)
var dat ResponseData
err = json.Unmarshal(res.Json, &dat)
require.NoError(t, err)

require.Equal(t, len(dat.All[0].Counts), 10)
}

func TestSequentialUpdate(t *testing.T) {
InitData(t)
ctx := context.Background()
dg, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)

count := 10
mutation := func(i int) {
query := `query {
user as var(func: eq(name, "Alice"))
}`
mu := &api.Mutation{
SetNquads: []byte(fmt.Sprintf(`uid(user) <count> "%d" .`, i)),
}
req := &api.Request{
Query: query,
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
_, err := dg.NewTxn().Do(ctx, req)
require.NoError(t, err)

}
for i := 0; i < count; i++ {
mutation(i)
}

q := `query all($a: string) {
all(func: eq(name, $a)) {
name
count
}
}`

txn := dg.NewTxn()
res, err := txn.QueryWithVars(ctx, q, map[string]string{"$a": "Alice"})
require.NoError(t, err)
var dat ResponseData
err = json.Unmarshal(res.Json, &dat)
require.NoError(t, err)

require.Equal(t, len(dat.All[0].Counts), 10)
}
3 changes: 2 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ func (n *node) processApplyCh() {
psz := proposal.Size()
totalSize += int64(psz)

if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil && proposal.Mutations.StartTs == 0 {
// In case of upserts the startTs would be > 0, so, no need to check startTs is 0
if x.WorkerConfig.LudicrousMode && proposal.Mutations != nil {
proposal.Mutations.StartTs = State.GetTimestamp(false)
}

Expand Down

0 comments on commit edae1c6

Please sign in to comment.