Skip to content

Commit

Permalink
Use backwards-compatible formats during backup (#3629)
Browse files Browse the repository at this point in the history
This change converts the keys and posting lists to a
backwards-compatible format so that backups work accross versions of
Dgraph. The restore logic is also changed to convert the data back to
the internal Dgraph formats.
  • Loading branch information
martinmr authored Jul 11, 2019
1 parent 3590a40 commit 30ad3a4
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 39 deletions.
150 changes: 144 additions & 6 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
package backup

import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/url"
"sync"

"github.com/dgraph-io/badger"
bpb "github.com/dgraph-io/badger/pb"
"github.com/golang/glog"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

// Processor handles the different stages of the backup process.
Expand Down Expand Up @@ -93,24 +101,33 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
predMap[pred] = struct{}{}
}

var maxVersion uint64
gzWriter := gzip.NewWriter(handler)
stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.KeyToList = pr.toBackupList
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey := x.Parse(item.Key())
_, ok := predMap[parsedKey.Attr]
return ok
}
gzWriter := gzip.NewWriter(handler)
newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs)
stream.Send = func(list *bpb.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
}
return writeKVList(list, gzWriter)
}

if err != nil {
if err := stream.Orchestrate(context.Background()); err != nil {
glog.Errorf("While taking backup: %v", err)
return &emptyRes, err
}

if newSince > pr.Request.ReadTs {
if maxVersion > pr.Request.ReadTs {
glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)",
newSince, pr.Request.ReadTs)
maxVersion, pr.Request.ReadTs)
}

glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs)
Expand Down Expand Up @@ -161,3 +178,124 @@ func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) err
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups)
}

func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
list := &bpb.KVList{}

for itr.Valid() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
return list, nil
}
if item.Version() < pr.Request.SinceTs {
// Ignore versions less than given timestamp, or skip older versions of
// the given key.
return list, nil
}

switch item.UserMeta() {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
kvs, err := l.Rollup()
if err != nil {
return nil, errors.Wrapf(err, "while rolling up list")
}

for _, kv := range kvs {
backupKey, err := toBackupKey(kv.Key)
if err != nil {
return nil, err
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
if err != nil {
return nil, err
}
kv.Value = backupPl
}
list.Kv = append(list.Kv, kvs...)

case posting.BitSchemaPosting:
var valCopy []byte
if !item.IsDeletedOrExpired() {
// No need to copy value if item is deleted or expired.
var err error
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil, errors.Wrapf(err, "while copying value")
}
}

backupKey, err := toBackupKey(key)
if err != nil {
return nil, err
}

kv := &bpb.KV{
Key: backupKey,
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
list.Kv = append(list.Kv, kv)

if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() {
return list, nil
}

// Manually advance the iterator. This cannot be done in the for
// statement because ReadPostingList advances the iterator so this
// only needs to be done for BitSchemaPosting entries.
itr.Next()

default:
return nil, errors.Errorf(
"Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key))
}
}

// This shouldn't be reached but it's being added here because the golang
// compiler complains about the missing return statement.
return list, nil
}

func toBackupKey(key []byte) ([]byte, error) {
parsedKey := x.Parse(key)
if parsedKey == nil {
return nil, errors.Errorf("could not parse key %s", hex.Dump(key))
}
backupKey, err := parsedKey.ToBackupKey().Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting key for backup")
}
return backupKey, nil
}

func toBackupPostingList(val []byte) ([]byte, error) {
pl := &pb.PostingList{}
if err := pl.Unmarshal(val); err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
backupVal, err := posting.ToBackupPostingList(pl).Marshal()
if err != nil {
return nil, errors.Wrapf(err, "while converting posting list for backup")
}
return backupVal, nil
}

func writeKVList(list *bpb.KVList, w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil {
return err
}
buf, err := list.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
144 changes: 144 additions & 0 deletions ee/backup/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// +build !oss

/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Dgraph Community License (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt
*/

package backup

import (
"bufio"
"compress/gzip"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"path/filepath"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
bpb "github.com/dgraph-io/badger/pb"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location, backupId string) (uint64, error) {
// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, backupId, func(r io.Reader, groupId int) error {
dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32))
if err != nil {
return err
}
defer db.Close()
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(dir) {
fmt.Println("Creating new db:", dir)
}
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil
}
return loadFromBackup(db, gzReader, 16)
})
}

// loadFromBackup reads the backup, converts the keys and values to the required format,
// and loads them to the given badger DB.
func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)

loader := db.NewKVLoader(maxPendingWrites)
for {
var sz uint64
err := binary.Read(br, binary.LittleEndian, &sz)
if err == io.EOF {
break
} else if err != nil {
return err
}

if cap(unmarshalBuf) < int(sz) {
unmarshalBuf = make([]byte, sz)
}

if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
return err
}

list := &bpb.KVList{}
if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil {
return err
}

for _, kv := range list.Kv {
if len(kv.GetUserMeta()) != 1 {
return errors.Errorf(
"Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key))
}

restoreKey, err := fromBackupKey(kv.Key)
if err != nil {
return err
}

var restoreVal []byte
switch kv.GetUserMeta()[0] {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
var err error
backupPl := &pb.BackupPostingList{}
if err := backupPl.Unmarshal(kv.Value); err != nil {
return errors.Wrapf(err, "while reading backup posting list")
}
restoreVal, err = posting.FromBackupPostingList(backupPl).Marshal()
if err != nil {
return errors.Wrapf(err, "while converting backup posting list")
}

case posting.BitSchemaPosting:
restoreVal = kv.Value

default:
return errors.Errorf(
"Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key))
}

kv.Key = restoreKey
kv.Value = restoreVal
if err := loader.Set(kv); err != nil {
return err
}
}
}

if err := loader.Finish(); err != nil {
return err
}

return nil
}

func fromBackupKey(key []byte) ([]byte, error) {
backupKey := &pb.BackupKey{}
if err := backupKey.Unmarshal(key); err != nil {
return nil, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key))
}
return x.FromBackupKey(backupKey), nil
}
33 changes: 0 additions & 33 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@
package backup

import (
"compress/gzip"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
Expand Down Expand Up @@ -213,33 +207,6 @@ func runRestoreCmd() error {
return nil
}

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location, backupId string) (uint64, error) {
// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, backupId, func(r io.Reader, groupId int) error {
dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(true).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32))
if err != nil {
return err
}
defer db.Close()
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(dir) {
fmt.Println("Creating new db:", dir)
}
gzReader, err := gzip.NewReader(r)
if err != nil {
return nil
}
return db.Load(gzReader, 16)
})
}

func runLsbackupCmd() error {
fmt.Println("Listing backups from:", opt.location)
manifests, err := ListManifests(opt.location)
Expand Down

0 comments on commit 30ad3a4

Please sign in to comment.