Skip to content

Commit

Permalink
Allow blanknodes with variables in mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwin95r committed Jul 12, 2017
1 parent 333af81 commit 8a802a4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 49 deletions.
10 changes: 8 additions & 2 deletions cmd/dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,21 @@ func shareHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.Error, err.Error())
}
nquads := gql.WrapNQ(NewSharedQueryNQuads(rawQuery), protos.DirectedEdge_SET)
if mr, err = query.ToInternal(ctx, nquads, nil); err != nil {
newUids, err := query.AssignUids(ctx, nquads)
if err != nil {
fail()
return
}
if mr, err = query.ToInternal(ctx, nquads, nil, newUids); err != nil {
fail()
return
}
if err = query.ApplyMutations(ctx, &protos.Mutations{Edges: mr.Edges}); err != nil {
fail()
return
}
allocIdsStr := query.ConvertUidsToHex(mr.NewUids)
tempMap := query.StripBlankNode(newUids)
allocIdsStr := query.ConvertUidsToHex(tempMap)
payload := map[string]interface{}{
"code": x.Success,
"message": "Done",
Expand Down
39 changes: 4 additions & 35 deletions query/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
var expandEdge = flag.Bool("expand_edge", true, "Don't store predicates per node.")

type InternalMutation struct {
Edges []*protos.DirectedEdge
NewUids map[string]uint64
Edges []*protos.DirectedEdge
}

func (mr *InternalMutation) AddEdge(edge *protos.DirectedEdge, op protos.DirectedEdge_Op) {
Expand Down Expand Up @@ -175,16 +174,14 @@ func expandVariables(nq *gql.NQuad,
}
return nq.ExpandVariables(newUids, subjectUids, objectUids)
}

func ToInternal(ctx context.Context,
nquads gql.NQuads,
vars map[string]varValue) (InternalMutation, error) {
vars map[string]varValue, newUids map[string]uint64) (InternalMutation, error) {
var mr InternalMutation
var err error
var newUids map[string]uint64

if newUids, err = AssignUids(ctx, nquads); err != nil {
return mr, err
if newUids == nil {
newUids = make(map[string]uint64)
}

// Wrapper for a pointer to protos.Nquad
Expand Down Expand Up @@ -230,33 +227,5 @@ func ToInternal(ctx context.Context,
mr.AddEdge(edge, nquads.Types[i])
}

mr.NewUids = make(map[string]uint64)
// Strip out _: prefix from the blank node keys.
for k, v := range newUids {
if strings.HasPrefix(k, "_:") {
mr.NewUids[k[2:]] = v
}
}
return mr, nil
}

// ConvertAndApply materializes edges defined by the mutation
// and adds them to the database.
func ConvertAndApply(ctx context.Context, mutation *protos.Mutation) (map[string]uint64, error) {
var err error
var mr InternalMutation

set := gql.WrapNQ(mutation.Set, protos.DirectedEdge_SET)
del := gql.WrapNQ(mutation.Del, protos.DirectedEdge_DEL)
all := set.Add(del)

if mr, err = ToInternal(ctx, all, nil); err != nil {
return nil, err
}
var m = protos.Mutations{Edges: mr.Edges, Schema: mutation.Schema}

if err := ApplyMutations(ctx, &m); err != nil {
return nil, x.Wrap(err)
}
return mr.NewUids, nil
}
37 changes: 25 additions & 12 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2276,22 +2276,22 @@ func (qr *QueryRequest) prepareMutation() (err error) {
return
}

func (qr *QueryRequest) processNquads(ctx context.Context, nquads gql.NQuads) (map[string]uint64, error) {
func (qr *QueryRequest) processNquads(ctx context.Context, nquads gql.NQuads, newUids map[string]uint64) error {
var err error
var mr InternalMutation
if !nquads.IsEmpty() {
if mr, err = ToInternal(ctx, nquads, qr.vars); err != nil {
return mr.NewUids, x.Wrapf(&InternalError{err: err}, "failed to convert NQuads to edges")
if mr, err = ToInternal(ctx, nquads, qr.vars, newUids); err != nil {
return x.Wrapf(&InternalError{err: err}, "failed to convert NQuads to edges")
}
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("converted nquads to directed edges")
}
m := protos.Mutations{Edges: mr.Edges, Schema: qr.SchemaUpdate}
if err = ApplyMutations(ctx, &m); err != nil {
return mr.NewUids, x.Wrapf(&InternalError{err: err}, "failed to apply mutations")
return x.Wrapf(&InternalError{err: err}, "failed to apply mutations")
}
return mr.NewUids, nil
return nil
}

type ExecuteResult struct {
Expand All @@ -2308,6 +2308,7 @@ func (qr *QueryRequest) ProcessWithMutation(ctx context.Context) (er ExecuteResu
}

var depSet, indepSet, depDel, indepDel gql.NQuads
var newUids map[string]uint64
if qr.GqlQuery.Mutation != nil {
if qr.GqlQuery.Mutation.HasOps() && !mutationAllowed {
return er, x.Wrap(&InvalidRequestError{err: MutationNotAllowedErr})
Expand All @@ -2324,7 +2325,14 @@ func (qr *QueryRequest) ProcessWithMutation(ctx context.Context) (er ExecuteResu
Partition(gql.HasVariables)

nquads := indepSet.Add(indepDel)
er.Allocations, err = qr.processNquads(ctx, nquads)
nquadsTemp := nquads.Add(depDel).Add(depSet)
if newUids, err = AssignUids(ctx, nquadsTemp); err != nil {
return er, err
}

er.Allocations = StripBlankNode(newUids)

err = qr.processNquads(ctx, nquads, newUids)
if err != nil {
return er, err
}
Expand All @@ -2342,14 +2350,9 @@ func (qr *QueryRequest) ProcessWithMutation(ctx context.Context) (er ExecuteResu

nquads := depSet.Add(depDel)
if !nquads.IsEmpty() {
allocations, err := qr.processNquads(ctx, nquads)
if err != nil {
if err = qr.processNquads(ctx, nquads, newUids); err != nil {
return er, err
}
if len(allocations) > 0 {
return er, x.Wrapf(&InvalidRequestError{err: err},
"adding nodes when using variables is currently not supported")
}
}

if qr.GqlQuery.Schema != nil {
Expand All @@ -2359,3 +2362,13 @@ func (qr *QueryRequest) ProcessWithMutation(ctx context.Context) (er ExecuteResu
}
return er, nil
}

func StripBlankNode(mp map[string]uint64) map[string]uint64 {
temp := make(map[string]uint64)
for k, v := range mp {
if strings.HasPrefix(k, "_:") {
temp[k[2:]] = v
}
}
return temp
}

0 comments on commit 8a802a4

Please sign in to comment.