Skip to content

feat(GraphQL): Webhooks on add/update/delete mutations (GRAPHQL-1045) #7494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func extractUserAndGroups(ctx context.Context) ([]string, error) {
if err != nil {
return nil, err
}
return validateToken(accessJwt[0])
return validateToken(accessJwt)
}

type authPredResult struct {
Expand Down
7 changes: 7 additions & 0 deletions graphql/admin/add_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (mrw *addGroupRewriter) FromMutationResult(
return ((*resolve.AddRewriter)(mrw)).FromMutationResult(ctx, mutation, assigned, result)
}

func (mrw *addGroupRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {
return ((*resolve.AddRewriter)(mrw)).MutatedRootUIDs(mutation, assigned, result)
}

// removeDuplicateRuleRef removes duplicate rules based on predicate value.
// for duplicate rules, only the last rule with duplicate predicate name is preserved.
func removeDuplicateRuleRef(rules []interface{}) ([]interface{}, x.GqlErrorList) {
Expand Down
2 changes: 1 addition & 1 deletion graphql/admin/current_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func extractName(ctx context.Context) (string, error) {
return "", err
}

return x.ExtractUserName(accessJwt[0])
return x.ExtractUserName(accessJwt)
}

func (gsr *currentUserResolver) Rewrite(ctx context.Context,
Expand Down
7 changes: 7 additions & 0 deletions graphql/admin/update_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (urw *updateGroupRewriter) FromMutationResult(
return ((*resolve.UpdateRewriter)(urw)).FromMutationResult(ctx, mutation, assigned, result)
}

func (urw *updateGroupRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {
return ((*resolve.UpdateRewriter)(urw)).MutatedRootUIDs(mutation, assigned, result)
}

// addAclRuleQuery adds a *gql.GraphQuery to upsertQuery.Children to query a rule inside a group
// based on its predicate value.
func addAclRuleQuery(upsertQuery []*gql.GraphQuery, predicate, variable string) {
Expand Down
1 change: 1 addition & 0 deletions graphql/e2e/schema/apollo_service_response.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ directive @remote on OBJECT | INTERFACE | UNION | INPUT_OBJECT | ENUM
directive @remoteResponse(name: String) on FIELD_DEFINITION
directive @cascade(fields: [String]) on FIELD
directive @lambda on FIELD_DEFINITION
directive @lambdaOnMutate(add: Boolean, update: Boolean, delete: Boolean) on OBJECT | INTERFACE
directive @cacheControl(maxAge: Int!) on QUERY

input IntFilter {
Expand Down
1 change: 1 addition & 0 deletions graphql/e2e/schema/generatedSchema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ directive @remote on OBJECT | INTERFACE | UNION | INPUT_OBJECT | ENUM
directive @remoteResponse(name: String) on FIELD_DEFINITION
directive @cascade(fields: [String]) on FIELD
directive @lambda on FIELD_DEFINITION
directive @lambdaOnMutate(add: Boolean, update: Boolean, delete: Boolean) on OBJECT | INTERFACE
directive @cacheControl(maxAge: Int!) on QUERY
directive @generate(
query: GenerateQueryParams,
Expand Down
11 changes: 11 additions & 0 deletions graphql/resolve/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ type MutationRewriter interface {
m schema.Mutation,
assigned map[string]string,
result map[string]interface{}) ([]*gql.GraphQuery, error)
// MutatedRootUIDs returns a list of Root UIDs that were mutated as part of the mutation.
MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string
}

// A DgraphExecutor can execute a query/mutation and returns the request response and any errors.
Expand Down Expand Up @@ -422,6 +427,12 @@ func (mr *dgraphResolver) rewriteAndExecute(
}
commit = true

// once committed, send async updates to configured webhooks, if any.
if mutation.HasLambdaOnMutate() {
rootUIDs := mr.mutationRewriter.MutatedRootUIDs(mutation, mutResp.GetUids(), result)
go sendWebhookEvent(ctx, mutation, mutResp.Txn.CommitTs, rootUIDs)
}

// For delete mutation, we would have already populated qryResp if query field was requested.
if mutation.MutationType() != schema.DeleteMutation {
queryTimer := newtimer(ctx, &dgraphQueryDuration.OffsetDuration)
Expand Down
139 changes: 76 additions & 63 deletions graphql/resolve/mutation_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ func (xidMetadata *xidMetadata) isDuplicateXid(atTopLevel bool, xidVar string,
// simply ignored.
// If it is found out that the Person with id 0x123 does not exist, the corresponding
// mutation will fail.
func (mrw *AddRewriter) RewriteQueries(
func (arw *AddRewriter) RewriteQueries(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ctx context.Context,
m schema.Mutation) ([]*gql.GraphQuery, error) {

mrw.VarGen = NewVariableGenerator()
mrw.XidMetadata = NewXidMetadata()
arw.VarGen = NewVariableGenerator()
arw.XidMetadata = NewXidMetadata()

mutatedType := m.MutatedType()
val, _ := m.ArgValue(schema.InputArgName).([]interface{})
Expand All @@ -264,7 +264,7 @@ func (mrw *AddRewriter) RewriteQueries(

for _, i := range val {
obj := i.(map[string]interface{})
queries, errs := existenceQueries(ctx, mutatedType, nil, mrw.VarGen, obj, mrw.XidMetadata)
queries, errs := existenceQueries(ctx, mutatedType, nil, arw.VarGen, obj, arw.XidMetadata)
if len(errs) > 0 {
var gqlErrors x.GqlErrorList
for _, err := range errs {
Expand Down Expand Up @@ -394,7 +394,7 @@ func (urw *UpdateRewriter) RewriteQueries(
// } ],
// "Author.friends":[ {"uid":"0x123"} ],
// }
func (mrw *AddRewriter) Rewrite(
func (arw *AddRewriter) Rewrite(
ctx context.Context,
m schema.Mutation,
idExistence map[string]string) ([]*UpsertMutation, error) {
Expand All @@ -403,8 +403,8 @@ func (mrw *AddRewriter) Rewrite(
mutatedType := m.MutatedType()
val, _ := m.ArgValue(schema.InputArgName).([]interface{})

varGen := mrw.VarGen
xidMetadata := mrw.XidMetadata
varGen := arw.VarGen
xidMetadata := arw.XidMetadata
// ret stores a slice of Upsert Mutations. These are used in executing upsert queries in graphql/resolve/mutation.go
var ret []*UpsertMutation
// fragments stores a slice of mutationFragments. This is used in constructing mutationsAll which is returned back to the caller
Expand Down Expand Up @@ -472,7 +472,7 @@ func (mrw *AddRewriter) Rewrite(
}
if fragment != nil {
fragments = append(fragments, fragment)
mrw.frags = append(mrw.frags, []*mutationFragment{fragment})
arw.frags = append(arw.frags, []*mutationFragment{fragment})
}
}

Expand Down Expand Up @@ -696,58 +696,22 @@ func (urw *UpdateRewriter) Rewrite(
}

// FromMutationResult rewrites the query part of a GraphQL add mutation into a Dgraph query.
func (mrw *AddRewriter) FromMutationResult(
func (arw *AddRewriter) FromMutationResult(
ctx context.Context,
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) ([]*gql.GraphQuery, error) {

var errs error

// This stores a list of added or updated uids.
uids := make([]uint64, 0)

// Add any newly added uids.
for _, frag := range mrw.frags {
for _, frag := range arw.frags {
err := checkResult(frag, result)
errs = schema.AppendGQLErrs(errs, err)
if err != nil {
continue
}

node := strings.TrimPrefix(frag[0].
fragment.(map[string]interface{})["uid"].(string), "_:")
val, ok := assigned[node]
if !ok {
continue
}
uid, err := strconv.ParseUint(val, 0, 64)
if err != nil {
errs = schema.AppendGQLErrs(errs, schema.GQLWrapf(err,
"received %s as an assigned uid from Dgraph,"+
" but couldn't parse it as uint64",
assigned[node]))
}

uids = append(uids, uid)
}

// Extract and add any updated uids. This is done for upsert With Add Mutation.
// In this case, it may happen that no new node is created, but there may still
// be some updated nodes. We get these nodes over here and add to uid list.
mutated := extractMutated(result, mutation.Name())
if len(mutated) > 0 {
// This is the case of a conditional upsert where we should get uids from mutated.
for _, id := range mutated {
uid, err := strconv.ParseUint(id, 0, 64)
if err != nil {
return nil, schema.GQLWrapf(err,
"received %s as an updated uid from Dgraph, but couldn't parse it as "+
"uint64", id)
}
uids = append(uids, uid)
}
}
// Find any newly added/updated rootUIDs.
uids, err := convertIDsWithErr(arw.MutatedRootUIDs(mutation, assigned, result))
errs = schema.AppendGQLErrs(errs, err)

// Find out if its an upsert with Add mutation.
upsert := false
Expand Down Expand Up @@ -795,20 +759,9 @@ func (urw *UpdateRewriter) FromMutationResult(
return nil, err
}

mutated := extractMutated(result, mutation.Name())

var uids []uint64
if len(mutated) > 0 {
// This is the case of a conditional upsert where we should get uids from mutated.
for _, id := range mutated {
uid, err := strconv.ParseUint(id, 0, 64)
if err != nil {
return nil, schema.GQLWrapf(err,
"received %s as an updated uid from Dgraph, but couldn't parse it as "+
"uint64", id)
}
uids = append(uids, uid)
}
uids, err := convertIDsWithErr(urw.MutatedRootUIDs(mutation, assigned, result))
if err != nil {
return nil, err
}

customClaims, err := mutation.GetAuthMeta().ExtractCustomClaims(ctx)
Expand All @@ -826,6 +779,58 @@ func (urw *UpdateRewriter) FromMutationResult(
return rewriteAsQueryByIds(mutation.QueryField(), uids, authRw), nil
}

func (arw *AddRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {

var rootUIDs []string // This stores a list of added or updated rootUIDs.

// Add any newly added rootUIDs.
for _, frag := range arw.frags {
blankNodeName := strings.TrimPrefix(frag[0].
fragment.(map[string]interface{})["uid"].(string), "_:")
uid, ok := assigned[blankNodeName]
if ok {
rootUIDs = append(rootUIDs, uid)
}
}

// Extract and add any updated rootUIDs. This is done for upsert With Add Mutation.
// In this case, it may happen that no new node is created, but there may still
// be some updated nodes. We get these nodes over here and add to uid list.
rootUIDs = append(rootUIDs, extractMutated(result, mutation.Name())...)

return rootUIDs
}

func (urw *UpdateRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {

return extractMutated(result, mutation.Name())
}

func convertIDsWithErr(uidSlice []string) ([]uint64, error) {
var errs error
uids := make([]uint64, 0, len(uidSlice))

if len(uidSlice) > 0 {
for _, id := range uidSlice {
uid, err := strconv.ParseUint(id, 0, 64)
if err != nil {
errs = schema.AppendGQLErrs(errs, schema.GQLWrapf(err,
"received %s as a uid from Dgraph, but couldn't parse it as uint64", id))
continue
}
uids = append(uids, uid)
}
}

return uids, errs
}

func extractMutated(result map[string]interface{}, mutatedField string) []string {
var mutated []string

Expand Down Expand Up @@ -1097,6 +1102,14 @@ func (drw *deleteRewriter) FromMutationResult(
return nil, nil
}

func (drw *deleteRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {

return extractMutated(result, mutation.Name())
}

// RewriteQueries on deleteRewriter does not return any queries. queries to check
// existence of nodes are not needed as part of Delete Mutation.
// The function generates VarGen and XidMetadata which are used in Rewrite function.
Expand Down
Loading