Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GraphQL): Webhooks on add/update/delete mutations (GRAPHQL-1045) #7494

Merged
merged 19 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func extractUserAndGroups(ctx context.Context) ([]string, error) {
if err != nil {
return nil, err
}
return validateToken(accessJwt[0])
return validateToken(accessJwt)
}

type authPredResult struct {
Expand Down
7 changes: 7 additions & 0 deletions graphql/admin/add_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (mrw *addGroupRewriter) FromMutationResult(
return ((*resolve.AddRewriter)(mrw)).FromMutationResult(ctx, mutation, assigned, result)
}

func (mrw *addGroupRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {
return ((*resolve.AddRewriter)(mrw)).MutatedRootUIDs(mutation, assigned, result)
}

// removeDuplicateRuleRef removes duplicate rules based on predicate value.
// for duplicate rules, only the last rule with duplicate predicate name is preserved.
func removeDuplicateRuleRef(rules []interface{}) ([]interface{}, x.GqlErrorList) {
Expand Down
18 changes: 9 additions & 9 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func SchemaValidate(sch string) error {
return err
}

_, err = schema.FromString(schHandler.GQLSchema())
_, err = schema.FromString(schHandler.GQLSchema(), x.GalaxyNamespace)
return err
}

Expand Down Expand Up @@ -460,7 +460,7 @@ type adminServer struct {
// main /graphql endpoint and an admin server. The result is mainServer, adminServer.
func NewServers(withIntrospection bool, globalEpoch map[uint64]*uint64,
closer *z.Closer) (IServeGraphQL, IServeGraphQL, *GraphQLHealthStore) {
gqlSchema, err := schema.FromString("")
gqlSchema, err := schema.FromString("", x.GalaxyNamespace)
if err != nil {
x.Panic(err)
}
Expand Down Expand Up @@ -493,7 +493,7 @@ func newAdminResolver(
epoch map[uint64]*uint64,
closer *z.Closer) *resolve.RequestResolver {

adminSchema, err := schema.FromString(graphqlAdminSchema)
adminSchema, err := schema.FromString(graphqlAdminSchema, x.GalaxyNamespace)
if err != nil {
x.Panic(err)
}
Expand Down Expand Up @@ -561,7 +561,7 @@ func newAdminResolver(
var gqlSchema schema.Schema
// on drop_all, we will receive an empty string as the schema update
if newSchema.Schema != "" {
gqlSchema, err = generateGQLSchema(newSchema)
gqlSchema, err = generateGQLSchema(newSchema, ns)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
Expand Down Expand Up @@ -659,13 +659,13 @@ func getCurrentGraphQLSchema(namespace uint64) (*gqlSchema, error) {
return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil
}

func generateGQLSchema(sch *gqlSchema) (schema.Schema, error) {
func generateGQLSchema(sch *gqlSchema, ns uint64) (schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema, false)
if err != nil {
return nil, err
}
sch.GeneratedSchema = schHandler.GQLSchema()
generatedSchema, err := schema.FromString(sch.GeneratedSchema)
generatedSchema, err := schema.FromString(sch.GeneratedSchema, ns)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -709,7 +709,7 @@ func (as *adminServer) initServer() {
break
}

generatedSchema, err := generateGQLSchema(sch)
generatedSchema, err := generateGQLSchema(sch, x.GalaxyNamespace)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
break
Expand Down Expand Up @@ -818,7 +818,7 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
// introspection operations, and set GQL schema to empty.
if gqlSchema == nil {
resolverFactory = resolverFactoryWithErrorMsg(errNoGraphQLSchema)
gqlSchema, _ = schema.FromString("")
gqlSchema, _ = schema.FromString("", ns)
} else {
resolverFactory = resolverFactoryWithErrorMsg(errResolverNotFound).
WithConventionResolvers(gqlSchema, as.fns)
Expand Down Expand Up @@ -874,7 +874,7 @@ func (as *adminServer) lazyLoadSchema(namespace uint64) {
return
}

generatedSchema, err := generateGQLSchema(sch)
generatedSchema, err := generateGQLSchema(sch, namespace)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
return
Expand Down
2 changes: 1 addition & 1 deletion graphql/admin/current_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func extractName(ctx context.Context) (string, error) {
return "", err
}

return x.ExtractUserName(accessJwt[0])
return x.ExtractUserName(accessJwt)
}

func (gsr *currentUserResolver) Rewrite(ctx context.Context,
Expand Down
3 changes: 2 additions & 1 deletion graphql/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (usr *updateSchemaResolver) Resolve(ctx context.Context, m schema.Mutation)
return resolve.EmptyResult(m, err), false
}

if _, err = schema.FromString(schHandler.GQLSchema()); err != nil {
// we don't need the correct namespace for validation, so passing the Galaxy namespace
if _, err = schema.FromString(schHandler.GQLSchema(), x.GalaxyNamespace); err != nil {
return resolve.EmptyResult(m, err), false
}

Expand Down
7 changes: 7 additions & 0 deletions graphql/admin/update_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (urw *updateGroupRewriter) FromMutationResult(
return ((*resolve.UpdateRewriter)(urw)).FromMutationResult(ctx, mutation, assigned, result)
}

func (urw *updateGroupRewriter) MutatedRootUIDs(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) []string {
return ((*resolve.UpdateRewriter)(urw)).MutatedRootUIDs(mutation, assigned, result)
}

// addAclRuleQuery adds a *gql.GraphQuery to upsertQuery.Children to query a rule inside a group
// based on its predicate value.
func addAclRuleQuery(upsertQuery []*gql.GraphQuery, predicate, variable string) {
Expand Down
6 changes: 3 additions & 3 deletions graphql/dgraph/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (dg *DgraphEx) Execute(ctx context.Context, req *dgoapi.Request,
}

// CommitOrAbort is the underlying dgraph implementation for committing a Dgraph transaction
func (dg *DgraphEx) CommitOrAbort(ctx context.Context, tc *dgoapi.TxnContext) error {
_, err := (&edgraph.Server{}).CommitOrAbort(ctx, tc)
return err
func (dg *DgraphEx) CommitOrAbort(ctx context.Context,
tc *dgoapi.TxnContext) (*dgoapi.TxnContext, error) {
return (&edgraph.Server{}).CommitOrAbort(ctx, tc)
}
8 changes: 6 additions & 2 deletions graphql/e2e/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ var (
dgraphHealthURL = "http://" + Alpha1HTTP + "/health?all"
dgraphStateURL = "http://" + Alpha1HTTP + "/state"

// this port is used on the host machine to spin up a test HTTP server
lambdaHookServerAddr = ":8888"

retryableUpdateGQLSchemaErrors = []string{
"errIndexingInProgress",
"is already running",
Expand Down Expand Up @@ -797,8 +800,8 @@ func RunAll(t *testing.T) {
t.Run("query only typename", queryOnlyTypename)
t.Run("query nested only typename", querynestedOnlyTypename)
t.Run("test onlytypename for interface types", onlytypenameForInterface)
t.Run("entitites Query on extended type with key field of type String", entitiesQueryWithKeyFieldOfTypeString)
t.Run("entitites Query on extended type with key field of type Int", entitiesQueryWithKeyFieldOfTypeInt)
t.Run("entities Query on extended type with key field of type String", entitiesQueryWithKeyFieldOfTypeString)
t.Run("entities Query on extended type with key field of type Int", entitiesQueryWithKeyFieldOfTypeInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


t.Run("get state by xid", getStateByXid)
t.Run("get state without args", getStateWithoutArgs)
Expand Down Expand Up @@ -915,6 +918,7 @@ func RunAll(t *testing.T) {
t.Run("lambda on mutation using graphql", lambdaOnMutationUsingGraphQL)
t.Run("query lambda field in a mutation with duplicate @id", lambdaInMutationWithDuplicateId)
t.Run("lambda with apollo federation", lambdaWithApolloFederation)
t.Run("lambdaOnMutate hooks", lambdaOnMutateHooks)
}

func gunzipData(data []byte) ([]byte, error) {
Expand Down
5 changes: 3 additions & 2 deletions graphql/e2e/common/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ func (dg *panicClient) Execute(ctx context.Context, req *dgoapi.Request,
return nil, nil
}

func (dg *panicClient) CommitOrAbort(ctx context.Context, tc *dgoapi.TxnContext) error {
return nil
func (dg *panicClient) CommitOrAbort(ctx context.Context,
tc *dgoapi.TxnContext) (*dgoapi.TxnContext, error) {
return &dgoapi.TxnContext{}, nil
}

// clientInfoLogin check whether the client info(IP address) is propagated in the request.
Expand Down
173 changes: 173 additions & 0 deletions graphql/e2e/common/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@
package common

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -339,3 +345,170 @@ func lambdaWithApolloFederation(t *testing.T) {
DeleteGqlType(t, "Astronaut", map[string]interface{}{"id": []interface{}{"14", "30", "7"}}, 3,
nil)
}

// TODO(GRAPHQL-1123): need to find a way to make it work on TeamCity machines.
// The host `172.17.0.1` used to connect to host machine from within docker, doesn't seem to
// work in teamcity machines, neither does `host.docker.internal` works there. So, we are
// skipping the related test for now.
func lambdaOnMutateHooks(t *testing.T) {
t.Skipf("can't reach host machine from within docker")
// let's listen to the changes coming in from the lambda hook and store them in this array
var changelog []string
server := http.Server{Addr: lambdaHookServerAddr, Handler: http.NewServeMux()}
defer server.Shutdown(context.Background())
go func() {
serverMux := server.Handler.(*http.ServeMux)
serverMux.HandleFunc("/changelog", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)

var event map[string]interface{}
require.NoError(t, json.Unmarshal(b, &event))
require.Greater(t, event["commitTs"], float64(0))
delete(event, "commitTs")

b, err = json.Marshal(event)
require.NoError(t, err)

changelog = append(changelog, string(b))
})
t.Log(server.ListenAndServe())
}()

// wait a bit to make sure the server has started
time.Sleep(2 * time.Second)

// 1. Add 2 districts: D1, D2
addDistrictParams := &GraphQLParams{
Query: `mutation ($input: [AddDistrictInput!]!, $upsert: Boolean){
addDistrict(input: $input, upsert: $upsert) {
district {
dgId
id
}
}
}`,
Variables: map[string]interface{}{
"input": []interface{}{
map[string]interface{}{"id": "D1", "name": "Dist-1"},
map[string]interface{}{"id": "D2", "name": "Dist-2"},
},
"upsert": false,
},
}
resp := addDistrictParams.ExecuteAsPost(t, GraphqlURL)
resp.RequireNoGQLErrors(t)

var addResp struct {
AddDistrict struct{ District []struct{ DgId, Id string } }
}
require.NoError(t, json.Unmarshal(resp.Data, &addResp))
require.Len(t, addResp.AddDistrict.District, 2)

// find the uid for each district, to be used later in comparing expectation with reality
var d1Uid, d2Uid string
for _, dist := range addResp.AddDistrict.District {
switch dist.Id {
case "D1":
d1Uid = dist.DgId
case "D2":
d2Uid = dist.DgId
}
}

// 2. Upsert the district D1 with an updated name
addDistrictParams.Variables = map[string]interface{}{
"input": []interface{}{
map[string]interface{}{"id": "D1", "name": "Dist_1"},
},
"upsert": true,
}
resp = addDistrictParams.ExecuteAsPost(t, GraphqlURL)
resp.RequireNoGQLErrors(t)

// 3. Update the name for district D2
updateDistrictParams := &GraphQLParams{
Query: `mutation {
updateDistrict(input: {
filter: { id: {eq: "D2"}}
set: {name: "Dist_2"}
remove: {name: "Dist-2"}
}) {
numUids
}
}`,
}
resp = updateDistrictParams.ExecuteAsPost(t, GraphqlURL)
resp.RequireNoGQLErrors(t)

// 4. Delete both the Districts
DeleteGqlType(t, "District", GetXidFilter("id", []interface{}{"D1", "D2"}), 2, nil)

// let's wait for at least 5 secs to get all the updates from the lambda hook
time.Sleep(5 * time.Second)

// compare the expected vs the actual ones
testutil.CompareJSON(t, fmt.Sprintf(`{"changelog": [
{
"__typename": "District",
"operation": "add",
"add": {
"rootUIDs": [
"%s",
"%s"
],
"input": [
{
"id": "D1",
"name": "Dist-1"
},
{
"id": "D2",
"name": "Dist-2"
}
]
}
},
{
"__typename": "District",
"operation": "add",
"add": {
"rootUIDs": [
"%s"
],
"input": [
{
"name": "Dist_1"
}
]
}
},
{
"__typename": "District",
"operation": "update",
"update": {
"rootUIDs": [
"%s"
],
"setPatch": {
"name": "Dist_2"
},
"removePatch": {
"name": "Dist-2"
}
}
},
{
"__typename": "District",
"operation": "delete",
"delete": {
"rootUIDs": [
"%s",
"%s"
]
}
}
]}`, d1Uid, d2Uid, d1Uid, d2Uid, d1Uid, d2Uid),
`{"changelog": [`+strings.Join(changelog, ",")+"]}")
}
3 changes: 2 additions & 1 deletion graphql/e2e/directives/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ type Region {
district: District
}

type District {
type District @lambdaOnMutate(add: true, update: true, delete: true) {
dgId: ID!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation seems off. Likely due to different tab spaces being used.

id: String! @id
name: String!
}
Expand Down
Loading