Skip to content

Commit

Permalink
Include types in results of export operation. (#3493)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Jun 3, 2019
1 parent e19c044 commit a41db40
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 25 deletions.
78 changes: 73 additions & 5 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,55 @@ func toSchema(attr string, update pb.SchemaUpdate) (*bpb.KVList, error) {
return listWrap(kv), nil
}

func toType(attr string, update pb.TypeUpdate) (*bpb.KVList, error) {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("type %s {\n", attr))
for _, field := range update.Fields {
buf.WriteString(fieldToString(field))
}

buf.WriteString("}\n")

kv := &bpb.KV{
Value: buf.Bytes(),
Version: 2, // Type value
}
return listWrap(kv), nil
}

func fieldToString(update *pb.SchemaUpdate) string {
var builder strings.Builder
builder.WriteString("\t")
builder.WriteString(update.Predicate)
builder.WriteString(": ")

if update.List {
builder.WriteString("[")
}

if update.ValueType == pb.Posting_OBJECT {
builder.WriteString(update.ObjectTypeName)
} else {
tid := types.TypeID(update.ValueType)
builder.WriteString(tid.Name())
}

if update.NonNullable {
builder.WriteString("!")
}

if update.List {
builder.WriteString("]")
}

if update.NonNullableList {
builder.WriteString("!")
}

builder.WriteString("\n")
return builder.String()
}

type fileWriter struct {
fd *os.File
bw *bufio.Writer
Expand Down Expand Up @@ -440,12 +489,16 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
if pk.Attr == "_predicate_" {
return false
}
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false

if !pk.IsType() {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
}

// We need to ensure that schema keys are separately identifiable, so they can be
// written to a different file.
return pk.IsData() || pk.IsSchema()
return pk.IsData() || pk.IsSchema() || pk.IsType()
}
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
item := itr.Item()
Expand All @@ -455,9 +508,10 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
e.uid = pk.Uid
e.attr = pk.Attr

// Schema and type keys should be handled first because schema keys are also
// considered data keys.
switch {
case pk.IsSchema():
// Schema should be handled first. Because schema keys are also considered data keys.
var update pb.SchemaUpdate
err := item.Value(func(val []byte) error {
return update.Unmarshal(val)
Expand All @@ -469,11 +523,24 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
}
return toSchema(pk.Attr, update)

case pk.IsType():
var update pb.TypeUpdate
err := item.Value(func(val []byte) error {
return update.Unmarshal(val)
})
if err != nil {
// Let's not propagate this error. We just log this and continue onwards.
glog.Errorf("Unable to unmarshal type: %+v. Err=%v\n", pk, err)
return nil, nil
}
return toType(pk.Attr, update)

case pk.IsData():
e.pl, err = posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}

switch in.Format {
case "json":
return e.toJSON()
Expand All @@ -495,11 +562,12 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
switch kv.Version {
case 1: // data
writer = dataWriter
case 2: // schema
case 2: // schema and types
writer = schemaWriter
default:
glog.Fatalf("Invalid data type found: %x", kv.Key)
}

if _, err := writer.gw.Write(kv.Value); err != nil {
return err
}
Expand Down
73 changes: 53 additions & 20 deletions worker/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package worker

import (
"bufio"
"bytes"
"compress/gzip"
"context"
"io/ioutil"
Expand All @@ -29,6 +30,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/protos/api"
Expand All @@ -45,6 +47,30 @@ import (
"github.com/dgraph-io/dgraph/x"
)

var personType = &pb.TypeUpdate{
TypeName: "Person",
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
ValueType: pb.Posting_STRING,
NonNullable: true,
},
{
Predicate: "friend",
ValueType: pb.Posting_UID,
List: true,
NonNullable: true,
NonNullableList: true,
},
{
Predicate: "friend_not_served",
ValueType: pb.Posting_OBJECT,
List: true,
ObjectTypeName: "Person",
},
},
}

func populateGraphExport(t *testing.T) {
rdfEdges := []string{
`<1> <friend> <5> <author0> .`,
Expand Down Expand Up @@ -102,6 +128,16 @@ func initTestExport(t *testing.T, schemaStr string) {
txn.Set(x.SchemaKey("friend_not_served"), val)
require.NoError(t, txn.CommitAt(1, nil))
txn.Discard()

val, err = personType.Marshal()
require.NoError(t, err)

txn = pstore.NewTransactionAt(math.MaxUint64, true)
txn.Set(x.TypeKey("Person"), val)
require.NoError(t, err)
require.NoError(t, txn.CommitAt(1, nil))
txn.Discard()

populateGraphExport(t)
}

Expand Down Expand Up @@ -134,32 +170,26 @@ func checkExportSchema(t *testing.T, schemaFileList []string) {

r, err := gzip.NewReader(f)
require.NoError(t, err)
var buf bytes.Buffer
buf.ReadFrom(r)

scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
result, err := schema.Parse(scanner.Text())
require.NoError(t, err)
require.Equal(t, 1, len(result.Schemas))
// We wrote schema for only two predicates
if result.Schemas[0].Predicate == "friend" {
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
} else {
require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range",
result.Schemas[0].Predicate)
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
}
count = len(result.Schemas)
}
require.NoError(t, scanner.Err())
// This order will be preserved due to file naming
require.Equal(t, 1, count)
result, err := schema.Parse(buf.String())
require.NoError(t, err)

require.Equal(t, 2, len(result.Schemas))
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range",
result.Schemas[1].Predicate)
require.Equal(t, "uid", types.TypeID(result.Schemas[1].ValueType).Name())

require.Equal(t, 1, len(result.Types))
require.True(t, proto.Equal(result.Types[0], personType))
}

func TestExportRdf(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on export.
initTestExport(t, "name:string @index .")
// Remove already existing export folders is any.

bdir, err := ioutil.TempDir("", "export")
require.NoError(t, err)
defer os.RemoveAll(bdir)
Expand Down Expand Up @@ -250,6 +280,9 @@ func TestExportRdf(t *testing.T) {
}

func TestExportJson(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on export.
initTestExport(t, "name:string @index .")

bdir, err := ioutil.TempDir("", "export")
require.NoError(t, err)
defer os.RemoveAll(bdir)
Expand Down

0 comments on commit a41db40

Please sign in to comment.