Skip to content

Commit

Permalink
Add support for executing upsert block
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Jun 3, 2019
1 parent 7a6dad8 commit 4fe3ff1
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 8 deletions.
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func timestamp() uint64 {
}

func processToFastJSON(q string) string {
res, err := gql.Parse(gql.Request{Str: q})
res, err := gql.Parse(gql.Request{Str: q}, nil)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -931,7 +931,7 @@ var q5 = `
`

func TestSchemaValidationError(t *testing.T) {
_, err := gql.Parse(gql.Request{Str: m5})
_, err := gql.Parse(gql.Request{Str: m5}, nil)
require.Error(t, err)
output, err := runGraphqlQuery(strings.Replace(q5, "<id>", "0x8", -1))
require.NoError(t, err)
Expand Down
88 changes: 88 additions & 0 deletions dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package alpha

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestUpsertMutation(t *testing.T) {
require.NoError(t, dropAll())
require.NoError(t, alterSchema(`email: string @index(exact) .`))

// Mutation with wrong name
m1 := `
upsert {
mutation {
set {
uid(v) <name> "Ashihs" .
uid(v) <email> "[email protected]" .
}
}
query {
me(func: eq(email, "[email protected]")) {
v as uid
}
}
}`
keys, preds, _, err := mutationWithTs(m1, "application/rdf", false, true, true, 0)
require.NoError(t, err)
require.True(t, len(keys) == 0)
require.Contains(t, preds, "1-name")
require.Contains(t, preds, "1-email")

// query should return the wrong name
q1 := `
{
q(func: has(email)) {
uid
name
email
}
}`
res, _, err := queryWithTs(q1, "application/graphqlpm", 0)
require.NoError(t, err)
require.Contains(t, res, "Ashihs")

// mutation with correct name
m2 := `
upsert {
mutation {
set {
uid(v) <name> "Ashish" .
}
}
query {
me(func: eq(email, "[email protected]")) {
v as uid
}
}
}`
keys, preds, _, err = mutationWithTs(m2, "application/rdf", false, true, true, 0)
require.NoError(t, err)
require.True(t, len(keys) == 0)
require.Contains(t, preds, "1-name")

// query should return correct name
res, _, err = queryWithTs(q1, "application/graphqlpm", 0)
require.NoError(t, err)
require.Contains(t, res, "Ashish")
}
2 changes: 1 addition & 1 deletion edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func authorizeQuery(ctx context.Context, req *api.Request) error {
parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Variables: req.Vars,
})
}, nil)
if err != nil {
return err
}
Expand Down
114 changes: 110 additions & 4 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,33 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
}
}()

needVars := findVars(gmu)
varToUID, err := doQueryInUpsert(ctx, &l, mu.Query, needVars, mu.StartTs)
if err != nil {
return resp, err
}

// update the values in mutation block from the query block.
for _, nq := range append(gmu.Set, gmu.Del...) {
if strings.HasPrefix(nq.Subject, "uid(") {
varName := nq.Subject[4 : len(nq.Subject)-1]
if uid, ok := varToUID[varName]; ok {
nq.Subject = uid
} else {
nq.Subject = "_:" + nq.Subject
}
}

if strings.HasPrefix(nq.ObjectId, "uid(") {
varName := nq.ObjectId[4 : len(nq.ObjectId)-1]
if uid, ok := varToUID[varName]; ok {
nq.ObjectId = uid
} else {
nq.ObjectId = "_:" + nq.ObjectId
}
}
}

newUids, err := query.AssignUids(ctx, gmu.Set)
if err != nil {
return resp, err
Expand Down Expand Up @@ -554,13 +581,92 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
return resp, nil
}

// findVars finds all the variables used in mutation block
func findVars(gmu *gql.Mutation) []string {
vars := make(map[string]struct{})
updateVars := func(s string) {
if strings.HasPrefix(s, "uid(") {
varName := s[4 : len(s)-1]
vars[varName] = struct{}{}
}
}
for _, nq := range gmu.Set {
updateVars(nq.Subject)
updateVars(nq.ObjectId)
}
for _, nq := range gmu.Del {
updateVars(nq.Subject)
updateVars(nq.ObjectId)
}

varsList := make([]string, 0, len(vars))
for v := range vars {
varsList = append(varsList, v)
}
glog.V(3).Infof("Variables used in mutation block: %v", varsList)

return varsList
}

// doQueryInUpsert processes a query in the upsert block.
func doQueryInUpsert(ctx context.Context, l *query.Latency, queryText string,
needsVar []string, startTs uint64) (map[string]string, error) {

varToUID := make(map[string]string)
if queryText == "" {
return varToUID, nil
}

if ctx.Err() != nil {
return nil, ctx.Err()
}
x.AssertTruef(startTs != 0, "Transaction timestamp is zero")

parsedReq, err := gql.Parse(gql.Request{
Str: queryText,
Variables: make(map[string]string),
}, needsVar)
if err != nil {
return nil, err
}
if err = validateQuery(parsedReq.Query); err != nil {
return nil, err
}

qr := query.Request{
Latency: l,
GqlQuery: &parsedReq,
ReadTs: startTs,
}
if err := qr.ProcessQuery(ctx); err != nil {
return nil, errors.Wrapf(err, "while processing query: %q", queryText)
}

if len(qr.GetVars()) <= 0 {
return nil, fmt.Errorf("upsert query op has no variables")
}

// TODO(Aman): allow multiple values for each variable.
// If a variable doesn't have any UID, we generate one ourselves later.
for name, v := range qr.GetVars() {
if v.Uids == nil {
continue
}
if len(v.Uids.Uids) > 1 {
return nil, fmt.Errorf("unsupported many values for var (%s)", name)
} else if len(v.Uids.Uids) == 1 {
varToUID[name] = fmt.Sprintf("%d", v.Uids.Uids[0])
}
}

return varToUID, nil
}

func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
if err := authorizeQuery(ctx, req); err != nil {
return nil, err
}
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
}
glog.V(3).Infof("Got a query: %+v", req)

return s.doQuery(ctx, req)
}
Expand Down Expand Up @@ -610,7 +716,7 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo
parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Variables: req.Vars,
})
}, nil)
if err != nil {
return resp, err
}
Expand Down
8 changes: 7 additions & 1 deletion gql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ type Result struct {

// Parse initializes and runs the lexer. It also constructs the GraphQuery subgraph
// from the lexed items.
func Parse(r Request) (res Result, rerr error) {
func Parse(r Request, needVars []string) (res Result, rerr error) {
query := r.Str
vmap := convertToVarMap(r.Variables)

Expand Down Expand Up @@ -540,6 +540,12 @@ func Parse(r Request) (res Result, rerr error) {
}

allVars := res.QueryVars
// Add the variables that are needed outside the query block.
// For example, mutation block in upsert block will be using
// variables from the query block that is getting parsed here.
if needVars != nil && len(needVars) != 0 {
allVars = append(allVars, &Vars{Needs: needVars})
}
if err := checkDependency(allVars); err != nil {
return res, err
}
Expand Down
5 changes: 5 additions & 0 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,11 @@ type Request struct {
vars map[string]varValue
}

// GetVars returns the vars stored in Request object.
func (req *Request) GetVars() map[string]varValue {
return req.vars
}

// ProcessQuery processes query part of the request (without mutations).
// Fills Subgraphs and Vars.
// It optionally also returns a map of the allocated uids in case of an upsert request.
Expand Down

0 comments on commit 4fe3ff1

Please sign in to comment.