Skip to content

Commit

Permalink
fix(bulk): throw the error instead of crashing (#7722)
Browse files Browse the repository at this point in the history
(cherry picked from commit a4d74d4)
  • Loading branch information
NamanJain8 committed Apr 26, 2021
1 parent f6e6c18 commit 1fe9c72
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
12 changes: 6 additions & 6 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) {
m.schema.checkAndSetInitialSchema(nq.Namespace)

// Appropriate schema must exist for the nquad's namespace by this time.
attr := x.NamespaceAttr(nq.Namespace, nq.Predicate)
de.Attr = x.NamespaceAttr(de.Namespace, de.Attr)
fwd, rev := m.createPostings(nq, de)
shard := m.state.shards.shardFor(attr)
key := x.DataKey(attr, sid)
shard := m.state.shards.shardFor(de.Attr)
key := x.DataKey(de.Attr, sid)
m.addMapEntry(key, fwd, shard)

if rev != nil {
key = x.ReverseKey(attr, oid)
key = x.ReverseKey(de.Attr, oid)
m.addMapEntry(key, rev, shard)
}
m.addIndexMapEntries(nq, de)
Expand Down Expand Up @@ -374,7 +374,7 @@ func (m *mapper) lookupUid(xid string, ns uint64) uint64 {
func (m *mapper) createPostings(nq gql.NQuad,
de *pb.DirectedEdge) (*pb.Posting, *pb.Posting) {

m.schema.validateType(de, nq.Namespace, nq.ObjectValue == nil)
m.schema.validateType(de, nq.ObjectValue == nil)

p := posting.NewPosting(de)
sch := m.schema.getSchema(x.NamespaceAttr(nq.GetNamespace(), nq.GetPredicate()))
Expand All @@ -399,7 +399,7 @@ func (m *mapper) createPostings(nq gql.NQuad,
// Reverse predicate
x.AssertTruef(nq.GetObjectValue() == nil, "only has reverse schema if object is UID")
de.Entity, de.ValueId = de.ValueId, de.Entity
m.schema.validateType(de, nq.Namespace, true)
m.schema.validateType(de, true)
rp := posting.NewPosting(de)

de.Entity, de.ValueId = de.ValueId, de.Entity // de reused so swap back.
Expand Down
9 changes: 4 additions & 5 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,23 @@ func (s *schemaStore) checkAndSetInitialSchema(namespace uint64) {
return
}

func (s *schemaStore) validateType(de *pb.DirectedEdge, namespace uint64, objectIsUID bool) {
func (s *schemaStore) validateType(de *pb.DirectedEdge, objectIsUID bool) {
if objectIsUID {
de.ValueType = pb.Posting_UID
}

attr := x.NamespaceAttr(namespace, de.Attr)
s.RLock()
sch, ok := s.schemaMap[attr]
sch, ok := s.schemaMap[de.Attr]
s.RUnlock()
if !ok {
s.Lock()
sch, ok = s.schemaMap[attr]
sch, ok = s.schemaMap[de.Attr]
if !ok {
sch = &pb.SchemaUpdate{ValueType: de.ValueType}
if objectIsUID {
sch.List = true
}
s.schemaMap[attr] = sch
s.schemaMap[de.Attr] = sch
}
s.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error {
switch {
case edge.Lang != "" && !su.GetLang():
return errors.Errorf("Attr: [%v] should have @lang directive in schema to mutate edge: [%v]",
edge.Attr, edge)
x.ParseAttr(edge.Attr), edge)

case !schemaType.IsScalar() && !storageType.IsScalar():
return nil
Expand Down

0 comments on commit 1fe9c72

Please sign in to comment.