Skip to content

graphql: Fix non-unique schema issue (#5054) #5481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ EOF
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 dgraph.acl.rule
1 dgraph.graphql.schema
1 dgraph.graphql.xid
1 dgraph.password
1 dgraph.rule.permission
1 dgraph.rule.predicate
Expand Down
3 changes: 2 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ const (
type GraphqlContextKey int

const (
// IsGraphql is used to validate requests which are allowed to mutate dgraph.graphql.schema.
// IsGraphql is used to validate requests which are allowed to mutate GraphQL reserved
// predicates, like dgraph.graphql.schema and dgraph.graphql.xid.
IsGraphql GraphqlContextKey = iota
// Authorize is used to set if the request requires validation.
Authorize
Expand Down
184 changes: 128 additions & 56 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package admin

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

dgoapi "github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"

"github.com/golang/glog"
"github.com/pkg/errors"

Expand All @@ -45,6 +47,10 @@ const (
"this indicates a resolver or validation bug " +
"(Please let us know : https://github.com/dgraph-io/dgraph/issues)"

gqlSchemaXidKey = "dgraph.graphql.xid"
gqlSchemaXidVal = "dgraph.graphql.schema"
gqlSchemaPred = "dgraph.graphql.schema"

// GraphQL schema for /admin endpoint.
graphqlAdminSchema = `
"""
Expand All @@ -56,7 +62,7 @@ const (
"""
Input schema (GraphQL types) that was used in the latest schema update.
"""
schema: String! @dgraph(type: "dgraph.graphql.schema")
schema: String! @dgraph(pred: "dgraph.graphql.schema")

"""
The GraphQL schema that was generated from the 'schema' field.
Expand Down Expand Up @@ -282,7 +288,7 @@ type adminServer struct {
// The GraphQL server that's being admin'd
gqlServer web.IServeGraphQL

schema gqlSchema
schema *gqlSchema

// When the schema changes, we use these to create a new RequestResolver for
// the main graphql endpoint (gqlServer) and thus refresh the API.
Expand Down Expand Up @@ -335,7 +341,7 @@ func newAdminResolver(
withIntrospection: withIntrospection,
}

prefix := x.DataKey("dgraph.graphql.schema", 0)
prefix := x.DataKey(gqlSchemaPred, 0)
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
// Listen for graphql schema changes in group 1.
Expand Down Expand Up @@ -367,31 +373,24 @@ func newAdminResolver(
return
}

newSchema := gqlSchema{
newSchema := &gqlSchema{
ID: fmt.Sprintf("%#x", pk.Uid),
Schema: string(pl.Postings[0].Value),
}

schHandler, err := schema.NewHandler(newSchema.Schema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
}

newSchema.GeneratedSchema = schHandler.GQLSchema()
gqlSchema, err := schema.FromString(newSchema.GeneratedSchema)
gqlSchema, err := generateGQLSchema(newSchema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
}

glog.Infof("Successfully updated GraphQL schema.")
glog.Infof("Successfully updated GraphQL schema. Serving New GraphQL API.")

server.mux.Lock()
defer server.mux.Unlock()

server.schema = newSchema
server.resetSchema(gqlSchema)
server.resetSchema(*gqlSchema)
}, 1, closer)

go server.initServer()
Expand Down Expand Up @@ -444,51 +443,145 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return rf.WithSchemaIntrospection()
}

func (as *adminServer) initServer() {
// It takes a few seconds for the Dgraph cluster to be up and running.
// Before that, trying to read the GraphQL schema will result in error:
// "Please retry again, server is not ready to accept requests."
// 5 seconds is a pretty reliable wait for a fresh instance to read the
// schema on a first try.
waitFor := 5 * time.Second
func upsertEmptyGQLSchema() (*gqlSchema, error) {
existingSchemaVar := "ExistingGQLSchema"
xidInSchemaVar := "XidInSchema"
gqlType := "dgraph.graphql"
/*
query {
ExistingGQLSchema as ExistingGQLSchema(func: type(dgraph.graphql)) {
uid
dgraph.graphql.schema
XidInSchema as dgraph.graphql.xid
}
}
*/
qry := &gql.GraphQuery{
Attr: existingSchemaVar,
Var: existingSchemaVar,
Func: &gql.Function{
Name: "type",
Args: []gql.Arg{{Value: gqlType}},
},
Children: []*gql.GraphQuery{
{Attr: "uid"},
{Attr: gqlSchemaPred},
{Attr: gqlSchemaXidKey, Var: xidInSchemaVar},
},
}
/*
mutation @if(eq(len(ExistingGQLSchema),1) AND eq(len(XidInSchema),0)) {
set {
"uid": "uid(ExistingGQLSchema)",
"dgraph.graphql.xid": "dgraph.graphql.schema"
}
}
mutation @if(eq(len(ExistingGQLSchema),0) AND eq(len(XidInSchema),0))
set {
"uid": "_:XidInSchema",
"dgraph.type": ["dgraph.graphql"],
"dgraph.graphql.xid": "dgraph.graphql.schema",
"dgraph.graphql.schema": ""
}
}
*/
mutations := []*dgoapi.Mutation{
{
SetJson: []byte(fmt.Sprintf(`
{
"uid": "uid(%s)",
"%s": "%s"
}`, existingSchemaVar, gqlSchemaXidKey, gqlSchemaXidVal)),
Cond: fmt.Sprintf(`@if(eq(len(%s),1) AND eq(len(%s),0))`, existingSchemaVar,
xidInSchemaVar),
},
{
SetJson: []byte(fmt.Sprintf(`
{
"uid": "_:%s",
"dgraph.type": ["%s"],
"%s": "%s",
"%s": ""
}`, xidInSchemaVar, gqlType, gqlSchemaXidKey, gqlSchemaXidVal, gqlSchemaPred)),
Cond: fmt.Sprintf(`@if(eq(len(%s),0) AND eq(len(%s),0))`, existingSchemaVar,
xidInSchemaVar),
},
}
assigned, result, err := resolve.AdminMutationExecutor().Mutate(context.Background(), qry,
mutations)
if err != nil {
return nil, err
}
// the Alpha which created a new gql schema node with xid will get the uid here
uid, ok := assigned[xidInSchemaVar]
if ok {
return &gqlSchema{ID: uid}, nil
}
// the Alphas which didn't create a new gql schema node, will get the uid here.
gqlSchemaNode := result[existingSchemaVar].([]interface{})[0].(map[string]interface{})
return &gqlSchema{
ID: gqlSchemaNode["uid"].(string),
Schema: gqlSchemaNode[gqlSchemaPred].(string),
}, nil
}

func generateGQLSchema(sch *gqlSchema) (*schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema)
if err != nil {
return nil, err
}

sch.GeneratedSchema = schHandler.GQLSchema()
generatedSchema, err := schema.FromString(sch.GeneratedSchema)
if err != nil {
return nil, err
}

return &generatedSchema, nil
}

func (as *adminServer) initServer() {
// Nothing else should be able to lock before here. The admin resolvers aren't yet
// set up (they all just error), so we will obtain the lock here without contention.
// We then setup the admin resolvers and they must wait until we are done before the
// first admin calls will go through.
as.mux.Lock()
defer as.mux.Unlock()

as.addConnectedAdminResolvers()
// It takes a few seconds for the Dgraph cluster to be up and running.
// Before that, trying to read the GraphQL schema will result in error:
// "Please retry again, server is not ready to accept requests."
// 5 seconds is a pretty reliable wait for a fresh instance to read the
// schema on a first try.
waitFor := 5 * time.Second

for {
<-time.After(waitFor)

sch, err := getCurrentGraphQLSchema(as.resolver)
sch, err := upsertEmptyGQLSchema()
if err != nil {
glog.Infof("Error reading GraphQL schema: %s.", err)
continue
} else if sch == nil {
glog.Infof("No GraphQL schema in Dgraph; serving empty GraphQL API")
break
}

schHandler, err := schema.NewHandler(sch.Schema)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
as.schema = sch
// adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has ID
as.addConnectedAdminResolvers()

if sch.Schema == "" {
glog.Infof("No GraphQL schema in Dgraph; serving empty GraphQL API")
break
}

sch.GeneratedSchema = schHandler.GQLSchema()
generatedSchema, err := schema.FromString(sch.GeneratedSchema)
generatedSchema, err := generateGQLSchema(sch)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
break
}

glog.Infof("Successfully loaded GraphQL schema. Serving GraphQL API.")

as.schema = *sch
as.resetSchema(generatedSchema)
as.resetSchema(*generatedSchema)

break
}
Expand All @@ -498,7 +591,6 @@ func (as *adminServer) initServer() {
func (as *adminServer) addConnectedAdminResolvers() {

qryRw := resolve.NewQueryRewriter()
addRw := resolve.NewAddRewriter()
updRw := resolve.NewUpdateRewriter()
qryExec := resolve.DgraphAsQueryExecutor()
mutExec := resolve.DgraphAsMutationExecutor()
Expand All @@ -510,7 +602,6 @@ func (as *adminServer) addConnectedAdminResolvers() {
func(m schema.Mutation) resolve.MutationResolver {
updResolver := &updateSchemaResolver{
admin: as,
baseAddRewriter: addRw,
baseMutationRewriter: updRw,
baseMutationExecutor: mutExec,
}
Expand All @@ -524,9 +615,7 @@ func (as *adminServer) addConnectedAdminResolvers() {
WithQueryResolver("getGQLSchema",
func(q schema.Query) resolve.QueryResolver {
getResolver := &getSchemaResolver{
admin: as,
baseRewriter: qryRw,
baseExecutor: resolve.AdminQueryExecutor(),
admin: as,
}

return resolve.NewQueryResolver(
Expand Down Expand Up @@ -623,23 +712,6 @@ func (as *adminServer) addConnectedAdminResolvers() {
})
}

func getCurrentGraphQLSchema(r *resolve.RequestResolver) (*gqlSchema, error) {
req := &schema.Request{
Query: `query { getGQLSchema { id schema } }`}
resp := r.Resolve(context.Background(), req)
if len(resp.Errors) > 0 || resp.Data.Len() == 0 {
return nil, resp.Errors
}

var result struct {
GetGQLSchema *gqlSchema
}

err := json.Unmarshal(resp.Data.Bytes(), &result)

return result.GetGQLSchema, err
}

func resolverFactoryWithErrorMsg(msg string) resolve.ResolverFactory {
errFunc := func(name string) error { return errors.Errorf(msg, name) }
qErr :=
Expand Down
Loading