From a41db408fcd1e219ce2055e72e254c872906368b Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Mon, 3 Jun 2019 12:05:30 -0700 Subject: [PATCH] Include types in results of export operation. (#3493) --- worker/export.go | 78 ++++++++++++++++++++++++++++++++++++++++--- worker/export_test.go | 73 +++++++++++++++++++++++++++++----------- 2 files changed, 126 insertions(+), 25 deletions(-) diff --git a/worker/export.go b/worker/export.go index fc1d20e7cd8..0767a9f4d2b 100644 --- a/worker/export.go +++ b/worker/export.go @@ -341,6 +341,55 @@ func toSchema(attr string, update pb.SchemaUpdate) (*bpb.KVList, error) { return listWrap(kv), nil } +func toType(attr string, update pb.TypeUpdate) (*bpb.KVList, error) { + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("type %s {\n", attr)) + for _, field := range update.Fields { + buf.WriteString(fieldToString(field)) + } + + buf.WriteString("}\n") + + kv := &bpb.KV{ + Value: buf.Bytes(), + Version: 2, // Type value + } + return listWrap(kv), nil +} + +func fieldToString(update *pb.SchemaUpdate) string { + var builder strings.Builder + builder.WriteString("\t") + builder.WriteString(update.Predicate) + builder.WriteString(": ") + + if update.List { + builder.WriteString("[") + } + + if update.ValueType == pb.Posting_OBJECT { + builder.WriteString(update.ObjectTypeName) + } else { + tid := types.TypeID(update.ValueType) + builder.WriteString(tid.Name()) + } + + if update.NonNullable { + builder.WriteString("!") + } + + if update.List { + builder.WriteString("]") + } + + if update.NonNullableList { + builder.WriteString("!") + } + + builder.WriteString("\n") + return builder.String() +} + type fileWriter struct { fd *os.File bw *bufio.Writer @@ -440,12 +489,16 @@ func export(ctx context.Context, in *pb.ExportRequest) error { if pk.Attr == "_predicate_" { return false } - if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet { - return false + + if !pk.IsType() { + if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet { + return false + } } + // We need to ensure that schema keys are separately identifiable, so they can be // written to a different file. - return pk.IsData() || pk.IsSchema() + return pk.IsData() || pk.IsSchema() || pk.IsType() } stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { item := itr.Item() @@ -455,9 +508,10 @@ func export(ctx context.Context, in *pb.ExportRequest) error { e.uid = pk.Uid e.attr = pk.Attr + // Schema and type keys should be handled first because schema keys are also + // considered data keys. switch { case pk.IsSchema(): - // Schema should be handled first. Because schema keys are also considered data keys. var update pb.SchemaUpdate err := item.Value(func(val []byte) error { return update.Unmarshal(val) @@ -469,11 +523,24 @@ func export(ctx context.Context, in *pb.ExportRequest) error { } return toSchema(pk.Attr, update) + case pk.IsType(): + var update pb.TypeUpdate + err := item.Value(func(val []byte) error { + return update.Unmarshal(val) + }) + if err != nil { + // Let's not propagate this error. We just log this and continue onwards. + glog.Errorf("Unable to unmarshal type: %+v. Err=%v\n", pk, err) + return nil, nil + } + return toType(pk.Attr, update) + case pk.IsData(): e.pl, err = posting.ReadPostingList(key, itr) if err != nil { return nil, err } + switch in.Format { case "json": return e.toJSON() @@ -495,11 +562,12 @@ func export(ctx context.Context, in *pb.ExportRequest) error { switch kv.Version { case 1: // data writer = dataWriter - case 2: // schema + case 2: // schema and types writer = schemaWriter default: glog.Fatalf("Invalid data type found: %x", kv.Key) } + if _, err := writer.gw.Write(kv.Value); err != nil { return err } diff --git a/worker/export_test.go b/worker/export_test.go index 4dc9389dfa2..54f1af5bb0b 100644 --- a/worker/export_test.go +++ b/worker/export_test.go @@ -18,6 +18,7 @@ package worker import ( "bufio" + "bytes" "compress/gzip" "context" "io/ioutil" @@ -29,6 +30,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" "github.com/dgraph-io/dgo/protos/api" @@ -45,6 +47,30 @@ import ( "github.com/dgraph-io/dgraph/x" ) +var personType = &pb.TypeUpdate{ + TypeName: "Person", + Fields: []*pb.SchemaUpdate{ + { + Predicate: "name", + ValueType: pb.Posting_STRING, + NonNullable: true, + }, + { + Predicate: "friend", + ValueType: pb.Posting_UID, + List: true, + NonNullable: true, + NonNullableList: true, + }, + { + Predicate: "friend_not_served", + ValueType: pb.Posting_OBJECT, + List: true, + ObjectTypeName: "Person", + }, + }, +} + func populateGraphExport(t *testing.T) { rdfEdges := []string{ `<1> <5> .`, @@ -102,6 +128,16 @@ func initTestExport(t *testing.T, schemaStr string) { txn.Set(x.SchemaKey("friend_not_served"), val) require.NoError(t, txn.CommitAt(1, nil)) txn.Discard() + + val, err = personType.Marshal() + require.NoError(t, err) + + txn = pstore.NewTransactionAt(math.MaxUint64, true) + txn.Set(x.TypeKey("Person"), val) + require.NoError(t, err) + require.NoError(t, txn.CommitAt(1, nil)) + txn.Discard() + populateGraphExport(t) } @@ -134,32 +170,26 @@ func checkExportSchema(t *testing.T, schemaFileList []string) { r, err := gzip.NewReader(f) require.NoError(t, err) + var buf bytes.Buffer + buf.ReadFrom(r) - scanner := bufio.NewScanner(r) - count := 0 - for scanner.Scan() { - result, err := schema.Parse(scanner.Text()) - require.NoError(t, err) - require.Equal(t, 1, len(result.Schemas)) - // We wrote schema for only two predicates - if result.Schemas[0].Predicate == "friend" { - require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name()) - } else { - require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range", - result.Schemas[0].Predicate) - require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name()) - } - count = len(result.Schemas) - } - require.NoError(t, scanner.Err()) - // This order will be preserved due to file naming - require.Equal(t, 1, count) + result, err := schema.Parse(buf.String()) + require.NoError(t, err) + + require.Equal(t, 2, len(result.Schemas)) + require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name()) + require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range", + result.Schemas[1].Predicate) + require.Equal(t, "uid", types.TypeID(result.Schemas[1].ValueType).Name()) + + require.Equal(t, 1, len(result.Types)) + require.True(t, proto.Equal(result.Types[0], personType)) } func TestExportRdf(t *testing.T) { // Index the name predicate. We ensure it doesn't show up on export. initTestExport(t, "name:string @index .") - // Remove already existing export folders is any. + bdir, err := ioutil.TempDir("", "export") require.NoError(t, err) defer os.RemoveAll(bdir) @@ -250,6 +280,9 @@ func TestExportRdf(t *testing.T) { } func TestExportJson(t *testing.T) { + // Index the name predicate. We ensure it doesn't show up on export. + initTestExport(t, "name:string @index .") + bdir, err := ioutil.TempDir("", "export") require.NoError(t, err) defer os.RemoveAll(bdir)