Skip to content

Commit

Permalink
Backup types (#4514)
Browse files Browse the repository at this point in the history
Types were not being properly backed up. This PR fixes the bug and changes the test to verify types and predicates appear in the restored backup.

Fixes #4507

(cherry picked from commit 261ac8e)
  • Loading branch information
martinmr authored and danielmai committed Jan 12, 2020
1 parent 5bd5209 commit 051da7a
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 43 deletions.
7 changes: 7 additions & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
if err != nil {
return false
}

// Backup type keys in every group.
if parsedKey.IsType() {
return true
}

// Only backup schema and data keys for the requested predicates.
_, ok := predMap[parsedKey.Attr]
return ok
}
Expand Down
3 changes: 2 additions & 1 deletion ee/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func RunRestore(pdir, location, backupId string) (uint64, error) {
}

// loadFromBackup reads the backup, converts the keys and values to the required format,
// and loads them to the given badger DB.
// and loads them to the given badger DB. The set of predicates is used to avoid restoring
// values from predicates no longer assigned to this group.
func loadFromBackup(db *badger.DB, r io.Reader, preds predicateSet) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)
Expand Down
21 changes: 18 additions & 3 deletions ee/backup/tests/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ func TestBackupFilesystem(t *testing.T) {
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

// Add initial data.
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`}))

// Add schema and types.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))

// Add initial data.
original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
Expand Down Expand Up @@ -256,10 +262,19 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
require.Equal(t, uint32(i+1), groupId)
}

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
pdir := "./data/restore/p1"
restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)

restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds)

restoredTypes, err := testutil.GetTypeNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"Node"}, restoredTypes)

return restored
}

Expand Down
6 changes: 3 additions & 3 deletions ee/backup/tests/minio-large/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
_, err := backup.RunRestore("./data/restore", backupLocation, lastDir)
require.NoError(t, err)

restored1, err := testutil.GetPValues("./data/restore/p1", "name1", commitTs)
restored1, err := testutil.GetPredicateValues("./data/restore/p1", "name1", commitTs)
require.NoError(t, err)
restored2, err := testutil.GetPValues("./data/restore/p2", "name2", commitTs)
restored2, err := testutil.GetPredicateValues("./data/restore/p2", "name2", commitTs)
require.NoError(t, err)
restored3, err := testutil.GetPValues("./data/restore/p3", "name3", commitTs)
restored3, err := testutil.GetPredicateValues("./data/restore/p3", "name3", commitTs)
require.NoError(t, err)

restored := make(map[string]string)
Expand Down
24 changes: 20 additions & 4 deletions ee/backup/tests/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ func TestBackupMinio(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

mc, err = testutil.NewMinioClient()
require.NoError(t, err)
require.NoError(t, mc.MakeBucket(bucketName, ""))

ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

// Add schema and types.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))

// Add initial data.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`}))
original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
Expand Down Expand Up @@ -264,8 +270,18 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string
require.NoError(t, err)
require.Equal(t, uint32(i+1), groupId)
}
pdir := "./data/restore/p1"
restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs)
require.NoError(t, err)

restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds)

restoredTypes, err := testutil.GetTypeNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"Node"}, restoredTypes)

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)

Expand Down
108 changes: 76 additions & 32 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,122 @@
package testutil

import (
"context"
"fmt"
"math"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
)

// GetPValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPValues(pdir, attr string, readTs uint64) (map[string]string, error) {
func openDgraph(pdir string) (*badger.DB, error) {
opt := badger.DefaultOptions(pdir).WithTableLoadingMode(options.MemoryMap).
WithReadOnly(true)
db, err := badger.OpenManaged(opt)
return badger.OpenManaged(opt)
}

// GetPredicateValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) {
db, err := openDgraph(pdir)
if err != nil {
return nil, err
}
defer db.Close()

values := make(map[string]string)

stream := db.NewStreamAt(math.MaxUint64)
stream.ChooseKey = func(item *badger.Item) bool {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk, err := x.Parse(item.Key())
x.Check(err)
switch {
case pk.Attr != attr:
return false
case pk.IsSchema():
return false
continue
case !pk.IsData():
continue
}
return pk.IsData()
}
stream.KeyToList = func(key []byte, it *badger.Iterator) (*bpb.KVList, error) {
pk, err := x.Parse(key)
x.Check(err)
pl, err := posting.ReadPostingList(key, it)

pl, err := posting.ReadPostingList(item.Key(), itr)
if err != nil {
return nil, err
}
var list bpb.KVList

err = pl.Iterate(readTs, 0, func(p *pb.Posting) error {
vID := types.TypeID(p.ValType)
src := types.ValueForType(vID)
src.Value = p.Value
str, err := types.Convert(src, types.StringID)
if err != nil {
fmt.Println(err)
return err
}
value := str.Value.(string)
list.Kv = append(list.Kv, &bpb.KV{
Key: []byte(fmt.Sprintf("%#x", pk.Uid)),
Value: []byte(value),
})
values[fmt.Sprintf("%#x", pk.Uid)] = value

return nil
})
return &list, err
}
stream.Send = func(list *bpb.KVList) error {
for _, kv := range list.Kv {
values[string(kv.Key)] = string(kv.Value)

if err != nil {
return nil, err
}
return nil
}
if err := stream.Orchestrate(context.Background()); err != nil {

return values, err
}

type dataType int

const (
schemaPredicate dataType = iota
schemaType
)

func readSchema(pdir string, dType dataType) ([]string, error) {
db, err := openDgraph(pdir)
if err != nil {
return nil, err
}
return values, err
defer db.Close()
values := make([]string, 0)

// Predicates and types in the schema are written with timestamp 1.
txn := db.NewTransactionAt(1, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk, err := x.Parse(item.Key())
x.Check(err)

switch {
case item.UserMeta() != posting.BitSchemaPosting:
continue
case pk.IsSchema() && dType != schemaPredicate:
continue
case pk.IsType() && dType != schemaType:
continue
}

values = append(values, pk.Attr)
}
return values, nil
}

// GetPredicateNames returns the list of all the predicates stored in the restored pdir.
func GetPredicateNames(pdir string, readTs uint64) ([]string, error) {
return readSchema(pdir, schemaPredicate)
}

// GetTypeNames returns the list of all the types stored in the restored pdir.
func GetTypeNames(pdir string, readTs uint64) ([]string, error) {
return readSchema(pdir, schemaType)
}

0 comments on commit 051da7a

Please sign in to comment.