Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify that all the fields in a type exist in the schema. #4114

Merged
merged 10 commits into from
Oct 8, 2019
12 changes: 11 additions & 1 deletion dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,16 @@ func TestListTypeSchemaChange(t *testing.T) {

func TestDeleteAllSP2(t *testing.T) {
s := `
nodeType: string .
name: string .
date: datetime .
weight: float .
weightUnit: string .
martinmr marked this conversation as resolved.
Show resolved Hide resolved
lifeLoad: int .
stressLevel: int .
plan: string .
postMortem: string .

type Node12345 {
nodeType
name
Expand Down Expand Up @@ -1274,7 +1284,7 @@ func TestDeleteAllSP2(t *testing.T) {

output, err := runGraphqlQuery(q)
require.NoError(t, err)
require.JSONEq(t, `{"data": {"me":[{"name":"July 3 2017","date":"2017-07-03T03:49:03+00:00","weight":"262.3","lifeLoad":"5","stressLevel":"3"}]}}`, output)
require.JSONEq(t, `{"data": {"me":[{"name":"July 3 2017","date":"2017-07-03T03:49:03Z","weight":262.3,"lifeLoad":5,"stressLevel":3}]}}`, output)

m = fmt.Sprintf(`
{
Expand Down
4 changes: 4 additions & 0 deletions query/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ type Node {

name : string @index(term, exact, trigram) @count @lang .
alias : string @index(exact, term, fulltext) .
abbr : string .
dob : dateTime @index(year) .
dob_day : dateTime @index(day) .
film.film.initial_release_date : dateTime @index(year) .
Expand Down Expand Up @@ -264,6 +265,9 @@ previous_model : uid @reverse .
created_at : datetime @index(hour) .
updated_at : datetime @index(year) .
number : int @index(int) .
district : [uid] .
state : [uid] .
county : [uid] .
firstName : string .
lastName : string .
`
Expand Down
2 changes: 2 additions & 0 deletions systest/bulk_live_cases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func TestCountIndex(t *testing.T) {

func TestLoadTypes(t *testing.T) {
s := newSuite(t, `
name: string .

type Person {
name
}
Expand Down
5 changes: 5 additions & 0 deletions systest/mutations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,9 @@ func EmptyRoomsWithTermIndex(t *testing.T, c *dgo.Dgraph) {
func DeleteWithExpandAll(t *testing.T, c *dgo.Dgraph) {
op := &api.Operation{}
op.Schema = `
to: [uid] .
name: string .

type Node {
to
name
Expand Down Expand Up @@ -1564,6 +1567,8 @@ func DropType(t *testing.T, c *dgo.Dgraph) {

require.NoError(t, c.Alter(ctx, &api.Operation{
Schema: `
name: string .

type Person {
name
}
Expand Down
1 change: 0 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}
}

return nil
}

Expand Down
94 changes: 64 additions & 30 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,7 @@ func createSchema(attr string, typ types.TypeID) {
}

func runTypeMutation(ctx context.Context, update *pb.TypeUpdate) error {
if err := checkType(update); err != nil {
return err
}
current := *update

schema.State().SetType(update.TypeName, current)
return updateType(update.TypeName, *update)
}
Expand Down Expand Up @@ -324,32 +320,6 @@ func checkSchema(s *pb.SchemaUpdate) error {
return nil
}

func checkType(t *pb.TypeUpdate) error {
if len(t.TypeName) == 0 {
return errors.Errorf("Type name must be specified in type update")
}

for _, field := range t.Fields {
if len(field.Predicate) == 0 {
return errors.Errorf("Field in type definition must have a name")
}

if field.ValueType == pb.Posting_OBJECT && len(field.ObjectTypeName) == 0 {
return errors.Errorf("Field with value type OBJECT must specify the name of the object type")
}

if field.Directive != pb.SchemaUpdate_NONE {
return errors.Errorf("Field in type definition cannot have a directive")
}

if len(field.Tokenizer) > 0 {
return errors.Errorf("Field in type definition cannot have tokenizers")
}
}

return nil
}

// ValidateAndConvert checks compatibility or converts to the schema type if the storage type is
// specified. If no storage type is specified then it converts to the schema type.
func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error {
Expand Down Expand Up @@ -551,6 +521,9 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
defer span.End()

tctx := &api.TxnContext{StartTs: m.StartTs}
if err := verifyTypes(ctx, m); err != nil {
return tctx, err
}
mutationMap, err := populateMutationMap(m)
if err != nil {
return tctx, err
Expand Down Expand Up @@ -584,6 +557,67 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
return tctx, e
}

func verifyTypes(ctx context.Context, m *pb.Mutations) error {
preds := make(map[string]struct{}, len(m.Schema))
for _, sup := range m.Schema {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
preds[sup.Predicate] = struct{}{}
}

for _, t := range m.Types {
if len(t.TypeName) == 0 {
return errors.Errorf("Type name must be specified in type update")
}

fields := make([]string, len(t.Fields))
for i, field := range t.Fields {
fields[i] = field.Predicate
}
schs, err := GetSchemaOverNetwork(ctx, &pb.SchemaRequest{Predicates: fields})
if err != nil {
return errors.Errorf("Cannot retrieve predicate information for fields in type %s",
t.TypeName)
}

schemaSet := make(map[string]struct{}, len(schs))
for _, schemaNode := range schs {
schemaSet[schemaNode.Predicate] = struct{}{}
}

// Verify all the fields in the type are already on the schema or come included in
// this request.
for _, field := range fields {
_, inSchema := schemaSet[field]
_, inRequest := preds[field]
if !inSchema && !inRequest {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf(
"Schema does not contain a matching predicate for field %s in type %s",
field, t.TypeName)
}
}

for _, field := range t.Fields {
if len(field.Predicate) == 0 {
return errors.Errorf("Field in type definition must have a name")
}

if field.ValueType == pb.Posting_OBJECT && len(field.ObjectTypeName) == 0 {
return errors.Errorf(
"Field with value type OBJECT must specify the name of the object type")
}

if field.Directive != pb.SchemaUpdate_NONE {
return errors.Errorf("Field in type definition cannot have a directive")
}

if len(field.Tokenizer) > 0 {
return errors.Errorf("Field in type definition cannot have tokenizers")
}
}
}

return nil
}

// CommitOverNetwork makes a proxy call to Zero to commit or abort a transaction.
func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) {
ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork")
Expand Down
6 changes: 1 addition & 5 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
return err
}
}

for _, schema := range proposal.Mutations.Schema {
if err := checkTablet(schema.Predicate); err != nil {
return err
Expand All @@ -168,11 +169,6 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
}
noTimeout = true
}
for _, typ := range proposal.Mutations.Types {
if err := checkType(typ); err != nil {
return err
}
}
}

// Let's keep the same key, so multiple retries of the same proposal would
Expand Down