Skip to content

Commit

Permalink
cherry-pick: fix: fixing graphql schema update when the data is resto…
Browse files Browse the repository at this point in the history
…red + skippin… (#7970)

* fix: fixing graphql schema update when the data is restored + skipping /probe/graphql from audit (#7925)

* fix: fixing grapgql schema update when the data is restored

* making audit to skip /probe/graphql endpoint as this is health endpoint for kube

(cherry picked from commit b85036e)

Co-authored-by: aman bansal <[email protected]>
  • Loading branch information
ahsanbarkati and aman-bansal authored Aug 2, 2021
1 parent 357d81c commit 6df546f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
5 changes: 3 additions & 2 deletions ee/audit/interceptor_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var skipApis = map[string]bool{

var skipEPs = map[string]bool{
// list of endpoints that needs to be skipped
"/health": true,
"/state": true,
"/health": true,
"/state": true,
"/probe/graphql": true,
}

func AuditRequestGRPC(ctx context.Context, req interface{},
Expand Down
52 changes: 24 additions & 28 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package admin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -612,14 +613,6 @@ func (g *GraphQLHealthStore) updatingSchema() {
g.v.Store(GraphQLHealth{Healthy: true, StatusMsg: "updating schema"})
}

type gqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
loaded bool // This indicate whether the schema has been loaded into graphql server or not
}

type adminServer struct {
rf resolve.ResolverFactory
resolver *resolve.RequestResolver
Expand All @@ -630,8 +623,7 @@ type adminServer struct {
// The GraphQL server that's being admin'd
gqlServer IServeGraphQL

schema map[uint64]*gqlSchema

gqlSchemas *worker.GQLSchemaStore
// When the schema changes, we use these to create a new RequestResolver for
// the main graphql endpoint (gqlServer) and thus refresh the API.
fns *resolve.ResolverFns
Expand Down Expand Up @@ -689,7 +681,7 @@ func newAdminResolver(
fns: fns,
withIntrospection: withIntrospection,
globalEpoch: epoch,
schema: make(map[uint64]*gqlSchema),
gqlSchemas: worker.NewGQLSchemaStore(),
gqlServer: defaultGqlServer,
}
adminServerVar = server // store the admin server in package variable
Expand Down Expand Up @@ -728,7 +720,7 @@ func newAdminResolver(
var data x.GQL
data.Schema, data.Script = worker.ParseAsSchemaAndScript(pl.Postings[0].Value)

newSchema := &gqlSchema{
newSchema := &worker.GqlSchema{
ID: query.UidToHex(pk.Uid),
Version: kv.GetVersion(),
Schema: data.Schema,
Expand All @@ -743,7 +735,7 @@ func newAdminResolver(
currentScript = script.Script
}
server.mux.RLock()
currentSchema, ok := server.schema[ns]
currentSchema, ok := server.gqlSchemas.GetCurrent(ns)
if ok {
schemaNotChanged := newSchema.Schema == currentSchema.Schema
scriptNotChanged := newScript.Script == currentScript
Expand Down Expand Up @@ -773,19 +765,19 @@ func newAdminResolver(

server.incrementSchemaUpdateCounter(ns)
// if the schema hasn't been loaded yet, then we don't need to load it here
currentSchema, ok = server.schema[ns]
if !(ok && currentSchema.loaded) {
currentSchema, ok = server.gqlSchemas.GetCurrent(ns)
if !(ok && currentSchema.Loaded) {
// this just set schema in admin server, so that next invalid badger subscription update gets rejected upfront
server.schema[ns] = newSchema
worker.Lambda().Set(ns, newScript)
server.gqlSchemas.Set(ns, newSchema)
glog.Infof("namespace: %d. Skipping in-memory GraphQL schema update, "+
"it will be lazy-loaded later.", ns)
return
}

// update this schema in both admin and graphql server
newSchema.loaded = true
server.schema[ns] = newSchema
newSchema.Loaded = true
server.gqlSchemas.Set(ns, newSchema)
server.resetSchema(ns, gqlSchema)
// Update the lambda script
worker.Lambda().Set(ns, newScript)
Expand Down Expand Up @@ -866,16 +858,16 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return rf.WithSchemaIntrospection()
}

func getCurrentGraphQLSchema(namespace uint64) (*gqlSchema, error) {
func getCurrentGraphQLSchema(namespace uint64) (*worker.GqlSchema, error) {
uid, graphQLSchema, err := edgraph.GetGQLSchema(namespace)
if err != nil {
return nil, err
}

return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil
return &worker.GqlSchema{ID: uid, Schema: graphQLSchema}, nil
}

func generateGQLSchema(sch *gqlSchema, ns uint64) (schema.Schema, error) {
func generateGQLSchema(sch *worker.GqlSchema, ns uint64) (schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -913,8 +905,8 @@ func (as *adminServer) initServer() {
glog.Errorf("namespace: %d. Error reading GraphQL schema: %s.", x.GalaxyNamespace, err)
continue
}
sch.loaded = true
as.schema[x.GalaxyNamespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(x.GalaxyNamespace, sch)
// adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has
// current GraphQL schema, if there was any.
as.addConnectedAdminResolvers()
Expand Down Expand Up @@ -1054,8 +1046,12 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
return resolve.QueryResolverFunc(func(ctx context.Context, query schema.Query) *resolve.Resolved {
as.mux.RLock()
defer as.mux.RUnlock()
sch := as.schema[ns].Schema
handler, err := schema.NewHandler(sch, true)
sch, ok := as.gqlSchemas.GetCurrent(ns)
if !ok {
return resolve.EmptyResult(query,
fmt.Errorf("error while getting the schema for ns %d", ns))
}
handler, err := schema.NewHandler(sch.Schema, true)
if err != nil {
return resolve.EmptyResult(query, err)
}
Expand All @@ -1082,7 +1078,7 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
func (as *adminServer) lazyLoadSchema(namespace uint64) error {
// if the schema is already in memory, no need to fetch it from disk
as.mux.RLock()
if currentSchema, ok := as.schema[namespace]; ok && currentSchema.loaded {
if currentSchema, ok := as.gqlSchemas.GetCurrent(namespace); ok && currentSchema.Loaded {
as.mux.RUnlock()
return nil
}
Expand Down Expand Up @@ -1112,8 +1108,8 @@ func (as *adminServer) lazyLoadSchema(namespace uint64) error {

as.mux.Lock()
defer as.mux.Unlock()
sch.loaded = true
as.schema[namespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(namespace, sch)
as.resetSchema(namespace, generatedSchema)

glog.Infof("namespace: %d. Successfully lazy-loaded GraphQL schema.", namespace)
Expand Down
5 changes: 3 additions & 2 deletions graphql/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"github.com/dgraph-io/dgraph/worker"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/graphql/resolve"
Expand All @@ -33,7 +34,7 @@ type getSchemaResolver struct {
}

type updateGQLSchemaInput struct {
Set gqlSchema `json:"set,omitempty"`
Set worker.GqlSchema `json:"set,omitempty"`
}

type updateSchemaResolver struct {
Expand Down Expand Up @@ -88,7 +89,7 @@ func (gsr *getSchemaResolver) Resolve(ctx context.Context, q schema.Query) *reso
return resolve.EmptyResult(q, err)
}

cs := gsr.admin.schema[ns]
cs, _ := gsr.admin.gqlSchemas.GetCurrent(ns)
if cs == nil || cs.ID == "" {
data = map[string]interface{}{q.Name(): nil}
} else {
Expand Down
47 changes: 47 additions & 0 deletions worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,55 @@ var (
errUpdatingGraphQLSchemaOnNonGroupOneLeader = errors.New(
"while updating GraphQL schema: this server isn't group-1 leader, please retry")
ErrMultipleGraphQLSchemaNodes = errors.New("found multiple nodes for GraphQL schema")
gqlSchemaStore *GQLSchemaStore
)

type GqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
Loaded bool // This indicate whether the schema has been loaded into graphql server
// or not
}

type GQLSchemaStore struct {
mux sync.RWMutex
schema map[uint64]*GqlSchema
}

func NewGQLSchemaStore() *GQLSchemaStore {
gqlSchemaStore = &GQLSchemaStore{
mux: sync.RWMutex{},
schema: make(map[uint64]*GqlSchema),
}
return gqlSchemaStore
}

func (gs *GQLSchemaStore) Set(ns uint64, sch *GqlSchema) {
gs.mux.Lock()
defer gs.mux.Unlock()
gs.schema[ns] = sch
}

func (gs *GQLSchemaStore) GetCurrent(ns uint64) (*GqlSchema, bool) {
gs.mux.RLock()
defer gs.mux.RUnlock()
sch, ok := gs.schema[ns]
return sch, ok
}

func (gs *GQLSchemaStore) resetGQLSchema() {
gs.mux.Lock()
defer gs.mux.Unlock()

gs.schema = make(map[uint64]*GqlSchema)
}

func ResetGQLSchemaStore() {
gqlSchemaStore.resetGQLSchema()
}

// UpdateGQLSchemaOverNetwork sends the request to the group one leader for execution.
func UpdateGQLSchemaOverNetwork(ctx context.Context, req *pb.UpdateGraphQLSchemaRequest) (*pb.
UpdateGraphQLSchemaResponse, error) {
Expand Down
5 changes: 3 additions & 2 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,10 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin
return errors.Wrapf(err, "cannot load schema after restore")
}

// Reset the lambda script store.
// reset gql schema and lambda script
glog.Info("reseting local gql schema and lambda script store")
ResetGQLSchemaStore()
ResetLambdaScriptStore()
// TODO(Aman): Reset the graphql schema store as well after cherry-picking changes to master.

// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
Expand Down

0 comments on commit 6df546f

Please sign in to comment.