Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include types in results of export operation. #3493

Merged
merged 3 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 73 additions & 5 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,55 @@ func toSchema(attr string, update pb.SchemaUpdate) (*bpb.KVList, error) {
return listWrap(kv), nil
}

func toType(attr string, update pb.TypeUpdate) (*bpb.KVList, error) {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("type %s {\n", attr))
for _, field := range update.Fields {
buf.WriteString(fieldToString(field))
}

buf.WriteString("}\n")

kv := &bpb.KV{
Value: buf.Bytes(),
Version: 2, // Type value
}
return listWrap(kv), nil
}

func fieldToString(update *pb.SchemaUpdate) string {
var builder strings.Builder
builder.WriteString("\t")
builder.WriteString(update.Predicate)
builder.WriteString(": ")

if update.List {
builder.WriteString("[")
}

if update.ValueType == pb.Posting_OBJECT {
builder.WriteString(update.ObjectTypeName)
} else {
tid := types.TypeID(update.ValueType)
builder.WriteString(tid.Name())
}

if update.NonNullable {
builder.WriteString("!")
}

if update.List {
builder.WriteString("]")
}

if update.NonNullableList {
builder.WriteString("!")
}

builder.WriteString("\n")
return builder.String()
}

type fileWriter struct {
fd *os.File
bw *bufio.Writer
Expand Down Expand Up @@ -439,12 +488,16 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
if pk.Attr == "_predicate_" {
return false
}
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false

if !pk.IsType() {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
}

// We need to ensure that schema keys are separately identifiable, so they can be
// written to a different file.
return pk.IsData() || pk.IsSchema()
return pk.IsData() || pk.IsSchema() || pk.IsType()
}
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
item := itr.Item()
Expand All @@ -454,9 +507,10 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
e.uid = pk.Uid
e.attr = pk.Attr

// Schema and type keys should be handled first because schema keys are also
// considered data keys.
switch {
case pk.IsSchema():
// Schema should be handled first. Because schema keys are also considered data keys.
var update pb.SchemaUpdate
err := item.Value(func(val []byte) error {
return update.Unmarshal(val)
Expand All @@ -468,11 +522,24 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
}
return toSchema(pk.Attr, update)

case pk.IsType():
var update pb.TypeUpdate
err := item.Value(func(val []byte) error {
return update.Unmarshal(val)
})
if err != nil {
// Let's not propagate this error. We just log this and continue onwards.
glog.Errorf("Unable to unmarshal type: %+v. Err=%v\n", pk, err)
return nil, nil
}
return toType(pk.Attr, update)

case pk.IsData():
e.pl, err = posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}

switch in.Format {
case "json":
return e.toJSON()
Expand All @@ -494,11 +561,12 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
switch kv.Version {
case 1: // data
writer = dataWriter
case 2: // schema
case 2: // schema and types
writer = schemaWriter
default:
glog.Fatalf("Invalid data type found: %x", kv.Key)
}

if _, err := writer.gw.Write(kv.Value); err != nil {
return err
}
Expand Down
73 changes: 53 additions & 20 deletions worker/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package worker

import (
"bufio"
"bytes"
"compress/gzip"
"context"
"io/ioutil"
Expand All @@ -29,6 +30,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/protos/api"
Expand All @@ -45,6 +47,30 @@ import (
"github.com/dgraph-io/dgraph/x"
)

var personType = &pb.TypeUpdate{
TypeName: "Person",
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
ValueType: pb.Posting_STRING,
NonNullable: true,
},
{
Predicate: "friend",
ValueType: pb.Posting_UID,
List: true,
NonNullable: true,
NonNullableList: true,
},
{
Predicate: "friend_not_served",
ValueType: pb.Posting_OBJECT,
List: true,
ObjectTypeName: "Person",
},
},
}

func populateGraphExport(t *testing.T) {
rdfEdges := []string{
`<1> <friend> <5> <author0> .`,
Expand Down Expand Up @@ -102,6 +128,16 @@ func initTestExport(t *testing.T, schemaStr string) {
txn.Set(x.SchemaKey("friend_not_served"), val)
require.NoError(t, txn.CommitAt(1, nil))
txn.Discard()

val, err = personType.Marshal()
require.NoError(t, err)

txn = pstore.NewTransactionAt(math.MaxUint64, true)
txn.Set(x.TypeKey("Person"), val)
require.NoError(t, err)
require.NoError(t, txn.CommitAt(1, nil))
txn.Discard()

populateGraphExport(t)
}

Expand Down Expand Up @@ -134,32 +170,26 @@ func checkExportSchema(t *testing.T, schemaFileList []string) {

r, err := gzip.NewReader(f)
require.NoError(t, err)
var buf bytes.Buffer
buf.ReadFrom(r)

scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
result, err := schema.Parse(scanner.Text())
require.NoError(t, err)
require.Equal(t, 1, len(result.Schemas))
// We wrote schema for only two predicates
if result.Schemas[0].Predicate == "friend" {
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
} else {
require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range",
result.Schemas[0].Predicate)
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
}
count = len(result.Schemas)
}
require.NoError(t, scanner.Err())
// This order will be preserved due to file naming
require.Equal(t, 1, count)
result, err := schema.Parse(buf.String())
require.NoError(t, err)

require.Equal(t, 2, len(result.Schemas))
require.Equal(t, "uid", types.TypeID(result.Schemas[0].ValueType).Name())
require.Equal(t, "http://www.w3.org/2000/01/rdf-schema#range",
result.Schemas[1].Predicate)
require.Equal(t, "uid", types.TypeID(result.Schemas[1].ValueType).Name())

require.Equal(t, 1, len(result.Types))
require.True(t, proto.Equal(result.Types[0], personType))
}

func TestExportRdf(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on export.
initTestExport(t, "name:string @index .")
// Remove already existing export folders is any.

bdir, err := ioutil.TempDir("", "export")
require.NoError(t, err)
defer os.RemoveAll(bdir)
Expand Down Expand Up @@ -250,6 +280,9 @@ func TestExportRdf(t *testing.T) {
}

func TestExportJson(t *testing.T) {
// Index the name predicate. We ensure it doesn't show up on export.
initTestExport(t, "name:string @index .")

bdir, err := ioutil.TempDir("", "export")
require.NoError(t, err)
defer os.RemoveAll(bdir)
Expand Down