Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(release/v20.03): feat(Dgraph): add utility to export backup data. #6591

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func init() {
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&backup.ExportBackup,
&acl.CmdAcl,
)
}
41 changes: 39 additions & 2 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@ var Restore x.SubCommand
// LsBackup is the sub-command used to list the backups in a folder.
var LsBackup x.SubCommand

var ExportBackup x.SubCommand

var opt struct {
backupId, location, pdir, zero, keyfile string
forceZero bool
backupId string
location string
pdir string
zero string
keyfile string
forceZero bool
format string
}

func init() {
initRestore()
initBackupLs()
initExportBackup()
}

func initRestore() {
Expand Down Expand Up @@ -243,3 +251,32 @@ func runLsbackupCmd() error {

return nil
}

func initExportBackup() {
ExportBackup.Cmd = &cobra.Command{
Use: "export_backup",
Short: "Export data inside single full or incremental backup.",
Long: ``,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(ExportBackup.Conf).Stop()
if err := runExportBackup(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
},
}

flag := ExportBackup.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
"Sets the location of the backup. Only file URIs are supported for now.")
flag.StringVarP(&opt.format, "format", "f", "rdf",
"The format of the export output. Accepts a value of either rdf or json")
flag.StringVarP(&opt.keyfile, "encryption_key_file", "", "",
"Path of the key needed to decrypt the backup.")
}

func runExportBackup() error {
exporter := worker.BackupExporter{}
return exporter.ExportBackup(opt.location, "", opt.format, opt.keyfile)
}
12 changes: 10 additions & 2 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
}
glog.Infof("Running export for group %d at timestamp %d.", in.GroupId, in.ReadTs)

return exportInternal(ctx, in, pstore, false)
}

// exportInternal contains the core logic to export a Dgraph database. If skipZero is set to
// false, the parts of this method that require to talk to zero will be skipped. This is useful
// when exporting a p directory directly from disk without a running cluster.
func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB,
skipZero bool) error {
uts := time.Unix(in.UnixTs, 0)
bdir := path.Join(x.WorkerConfig.ExportPath, fmt.Sprintf(
"dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504")))
Expand Down Expand Up @@ -446,7 +454,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
return errors.Wrapf(err, "cannot open export GraphQL schema file at %s", gqlSchemaPath)
}

stream := pstore.NewStreamAt(in.ReadTs)
stream := db.NewStreamAt(in.ReadTs)
stream.LogPrefix = "Export"
stream.ChooseKey = func(item *badger.Item) bool {
// Skip exporting delete data including Schema and Types.
Expand All @@ -472,7 +480,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
return false
}

if !pk.IsType() {
if !pk.IsType() && !skipZero {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
Expand Down
146 changes: 146 additions & 0 deletions worker/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@
package worker

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

Expand All @@ -34,6 +42,12 @@ type fileHandler struct {
fp *os.File
}

// BackupExporter is an alias of fileHandler so that this struct can be used
// by the export_backup command.
type BackupExporter struct {
fileHandler
}

// readManifest reads a manifest file at path using the handler.
// Returns nil on success, otherwise an error.
func (h *fileHandler) readManifest(path string, m *Manifest) error {
Expand Down Expand Up @@ -240,3 +254,135 @@ func pathExist(path string) bool {
}
return !os.IsNotExist(err) && !os.IsPermission(err)
}

func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
keyfile string) error {
if format != "json" && format != "rdf" {
return errors.Errorf("invalid format %s", format)
}

// Create exportDir and temporary folder to store the restored backup.
var err error
exportDir, err = filepath.Abs(exportDir)
if err != nil {
return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir)
}
if err := os.MkdirAll(exportDir, 0755); err != nil {
return errors.Wrapf(err, "cannot create dir %s", exportDir)
}
tmpDir, err := ioutil.TempDir("", "export_backup")
if err != nil {
return errors.Wrapf(err, "cannot create temp dir")
}

// Function to load the a single backup file.
loadFn := func(r io.Reader, groupId uint32, preds predicateSet) (uint64, error) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", groupId))

r, err := enc.GetReader(keyfile, r)
if err != nil {
return 0, err
}

gzReader, err := gzip.NewReader(r)
if err != nil {
if len(keyfile) != 0 {
err = errors.Wrap(err,
"Unable to read the backup. Ensure the encryption key is correct.")
}
return 0, errors.Wrapf(err, "cannot create gzip reader")
}
// The badger DB should be opened only after creating the backup
// file reader and verifying the encryption in the backup file.
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(enc.ReadEncryptionKeyFile(keyfile)))

if err != nil {
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, err = loadFromBackup(db, gzReader, preds)
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
return 0, x.WriteGroupIdFile(dir, uint32(groupId))
}

// Read manifest from folder.
manifest := &Manifest{}
manifestPath := filepath.Join(backupDir, backupManifest)
if err := h.ReadManifest(manifestPath, manifest); err != nil {
return errors.Wrapf(err, "cannot read manifest at %s", manifestPath)
}
manifest.Path = manifestPath
if manifest.Since == 0 || len(manifest.Groups) == 0 {
return errors.Errorf("no data found in backup")
}

// Restore backup to disk.
for gid := range manifest.Groups {
file := filepath.Join(backupDir, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "cannot open backup file at %s", file)
}
defer fp.Close()

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifest.getPredsInGroup(gid)

_, err = loadFn(fp, gid, predSet)
if err != nil {
return err
}
}

// Export the data from the p directories produced by the last step.
ch := make(chan error, len(manifest.Groups))
for gid := range manifest.Groups {
go func(group uint32) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", group))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(enc.ReadEncryptionKeyFile(keyfile)))

if err != nil {
ch <- errors.Wrapf(err, "cannot open DB at %s", dir)
return
}
defer db.Close()

req := &pb.ExportRequest{
GroupId: group,
ReadTs: manifest.Since,
UnixTs: time.Now().Unix(),
Format: format,
}

err = exportInternal(context.Background(), req, db, true)
ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir)
}(gid)
}

for i := 0; i < len(manifest.Groups); i++ {
err := <-ch
if err != nil {
return err
}
}

// Clean up temporary directory.
if err := os.RemoveAll(tmpDir); err != nil {
return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir)
}

return nil
}