Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow partial restores and restoring different backup series. #3547

Merged
merged 12 commits into from
Jun 13, 2019
9 changes: 7 additions & 2 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
}
}

m := backup.Manifest{Groups: groups}
m.Since = req.ReadTs
m := backup.Manifest{Groups: groups, Since: req.ReadTs}
if req.SinceTs == 0 {
m.Type = "full"
} else {
m.Type = "incremental"
}

bp := &backup.Processor{Request: &req}
return bp.CompleteBackup(ctx, &m)
}
1 change: 1 addition & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Processor struct {
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
Type string `json:"type"` // Either full or incremental.
Since uint64 `json:"since"`
Groups []uint32 `json:"groups"`
}
Expand Down
46 changes: 30 additions & 16 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,42 +111,56 @@ func (h *fileHandler) CreateManifest(uri *url.URL, req *pb.BackupRequest) error

// Load uses tries to load any backup files found.
// Returns the maximum value of Since on success, error otherwise.
func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
func (h *fileHandler) Load(uri *url.URL, lastDir string, fn loadFn) (uint64, error) {
if !pathExist(uri.Path) {
return 0, errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

suffix := filepath.Join(string(filepath.Separator), backupManifest)
manifests := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
paths := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, suffix)
})
if len(manifests) == 0 {
if len(paths) == 0 {
return 0, errors.Errorf("No manifests found at path: %s", uri.Path)
}
sort.Strings(manifests)
sort.Strings(paths)
if glog.V(3) {
fmt.Printf("Found backup manifest(s): %v\n", manifests)
fmt.Printf("Found backup manifest(s): %v\n", paths)
}

// Read and filter the files to get the list of files to consider
// for this restore operation.
var files []*manifestFile
for _, path := range paths {
var m Manifest
if err := h.readManifest(path, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", path)
}
files = append(files, &manifestFile{
path: path,
manifest: &m,
})
}
files, err := filterManifests(files, lastDir)
if err != nil {
return 0, err
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
var since uint64
for _, manifest := range manifests {
var m Manifest
if err := h.readManifest(manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
}
if m.Since == 0 || len(m.Groups) == 0 {
for i, f := range files {
if f.manifest.Since == 0 || len(f.manifest.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifest, &m)
fmt.Printf("Restore: skip backup: %s: %#v\n", f.path, f.manifest)
}
continue
}

path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.Since, groupId))
path := filepath.Dir(paths[i])
for _, groupId := range f.manifest.Groups {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, f.manifest.Since, groupId))
fp, err := os.Open(file)
if err != nil {
return 0, errors.Wrapf(err, "Failed to open %q", file)
Expand All @@ -156,7 +170,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return 0, err
}
}
since = m.Since
since = f.manifest.Since
}
return since, nil
}
Expand Down
48 changes: 43 additions & 5 deletions ee/backup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package backup

import (
"fmt"
"io"
"net/url"
"strings"

"github.com/dgraph-io/dgraph/protos/pb"

Expand Down Expand Up @@ -69,9 +71,11 @@ type UriHandler interface {
CreateManifest(*url.URL, *pb.BackupRequest) error

// Load will scan location URI for backup files, then load them via loadFn.
// It optionally takes the name of the last directory to consider. Any backup directories
// created after will be ignored.
// Objects implementing this function will be used for retrieving (dowload) backup files
// and loading the data into a DB. The restore CLI command uses this call.
Load(*url.URL, loadFn) (uint64, error)
Load(*url.URL, string, loadFn) (uint64, error)

// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
Expand Down Expand Up @@ -129,10 +133,11 @@ func NewUriHandler(uri *url.URL) (UriHandler, error) {
// A reader and the backup groupId are passed as arguments.
type loadFn func(reader io.Reader, groupId int) error

// Load will scan location l for backup files, then load them sequentially through reader.
// Load will scan location l for backup files (not including any directories
// created after lastDir), then load them sequentially through reader.
// Returns the maximum Since value on success, otherwise an error.
func Load(l string, fn loadFn) (since uint64, err error) {
uri, err := url.Parse(l)
func Load(location, lastDir string, fn loadFn) (since uint64, err error) {
uri, err := url.Parse(location)
if err != nil {
return 0, err
}
Expand All @@ -142,7 +147,7 @@ func Load(l string, fn loadFn) (since uint64, err error) {
return 0, errors.Errorf("Unsupported URI: %v", uri)
}

return h.Load(uri, fn)
return h.Load(uri, lastDir, fn)
}

// ListManifests scans location l for backup files and returns the list of manifests.
Expand Down Expand Up @@ -173,3 +178,36 @@ func ListManifests(l string) (map[string]*Manifest, error) {

return listedManifests, nil
}

type manifestFile struct {
path string
manifest *Manifest
}

// filterManifests takes a list of manifests, their paths, and returns the list of manifests
// that should be considered during a restore.
func filterManifests(files []*manifestFile, lastDir string) ([]*manifestFile, error) {
// Go through the files in reverse order and stop when the latest full backup is found.
var filteredManifests []*manifestFile
for i := len(files) - 1; i >= 0; i-- {
parts := strings.Split(files[i].path, "/")
dir := parts[len(parts)-2]
if len(lastDir) > 0 && dir > lastDir {
fmt.Printf("Restore: skip directory %s because it's newer than %s.\n", dir, lastDir)
continue
}

filteredManifests = append(filteredManifests, files[i])
if files[i].manifest.Type == "full" {
break
}
}

// Reverse the filtered lists since the original iteration happened in reverse.
for i := len(filteredManifests)/2 - 1; i >= 0; i-- {
opp := len(filteredManifests) - 1 - i
filteredManifests[i], filteredManifests[opp] = filteredManifests[opp], filteredManifests[i]
}

return filteredManifests, nil
}
28 changes: 12 additions & 16 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc"
Expand All @@ -38,7 +37,7 @@ var Restore x.SubCommand
var LsBackup x.SubCommand

var opt struct {
location, pdir, zero string
lastDir, location, pdir, zero string
}

func init() {
Expand Down Expand Up @@ -108,6 +107,10 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
flag.StringVarP(&opt.pdir, "postings", "p", "",
"Directory where posting lists are stored (required).")
flag.StringVarP(&opt.zero, "zero", "z", "", "gRPC address for Dgraph zero. ex: localhost:5080")
flag.StringVarP(&opt.lastDir, "last_dir", "", "", "The latest backup folder to consider during "+
"a restore operation. Useful for partial backups or to backup a previous series of "+
"backups. Only the name of the directory (e.g dgraph.1234567890) is required. An empty "+
"value is equivalent to setting it to the latest backup directory.")
_ = Restore.Cmd.MarkFlagRequired("postings")
_ = Restore.Cmd.MarkFlagRequired("location")
}
Expand Down Expand Up @@ -188,16 +191,14 @@ func runRestoreCmd() error {
}

start = time.Now()
version, err := RunRestore(opt.pdir, opt.location)
version, err := RunRestore(opt.pdir, opt.location, opt.lastDir)
if err != nil {
return err
}
if version == 0 {
return errors.Errorf("Failed to obtain a restore version")
}
if glog.V(2) {
fmt.Printf("Restore version: %d\n", version)
}
fmt.Printf("Restore version: %d\n", version)

if zc != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand All @@ -214,19 +215,16 @@ func runRestoreCmd() error {
}

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location string) (uint64, error) {
func RunRestore(pdir, location, lastDir string) (uint64, error) {
bo := badger.DefaultOptions
bo.SyncWrites = true
bo.TableLoadingMode = options.MemoryMap
bo.ValueThreshold = 1 << 10
bo.NumVersionsToKeep = math.MaxInt32
if !glog.V(2) {
bo.Logger = nil
}

// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, func(r io.Reader, groupId int) error {
return Load(location, lastDir, func(r io.Reader, groupId int) error {
bo := bo
bo.Dir = filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
bo.ValueDir = bo.Dir
Expand All @@ -235,11 +233,9 @@ func RunRestore(pdir, location string) (uint64, error) {
return err
}
defer db.Close()
if glog.V(2) {
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(bo.Dir) {
fmt.Println("Creating new db:", bo.Dir)
}
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(bo.Dir) {
fmt.Println("Creating new db:", bo.Dir)
}
return db.Load(r, 16)
})
Expand Down
48 changes: 31 additions & 17 deletions ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,52 +239,66 @@ func (h *s3Handler) readManifest(mc *minio.Client, object string, m *Manifest) e
// Load creates a new session, scans for backup objects in a bucket, then tries to
// load any backup objects found.
// Returns nil and the maximum Since value on success, error otherwise.
func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
func (h *s3Handler) Load(uri *url.URL, lastDir string, fn loadFn) (uint64, error) {
mc, err := h.setup(uri)
if err != nil {
return 0, err
}

var manifests []string
var manifestPaths []string

doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifests = append(manifests, object.Key)
manifestPaths = append(manifestPaths, object.Key)
}
}
if len(manifests) == 0 {
if len(manifestPaths) == 0 {
return 0, errors.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(manifests)
sort.Strings(manifestPaths)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifests)
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifestPaths)
}

// since is returned with the max manifest Since value found.
var since uint64

// Read and filter the files to get the list of files to consider
// for this restore operation.
var files []*manifestFile
for _, path := range manifestPaths {
var m Manifest
if err := h.readManifest(mc, path, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", path)
}
files = append(files, &manifestFile{
path: path,
manifest: &m,
})
}
files, err = filterManifests(files, lastDir)
if err != nil {
return 0, err
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
for _, manifest := range manifests {
var m Manifest
if err := h.readManifest(mc, manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
}
if m.Since == 0 || len(m.Groups) == 0 {
for i, f := range files {
if f.manifest.Since == 0 || len(f.manifest.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifest, &m)
fmt.Printf("Restore: skip backup: %s: %#v\n", f.path, f.manifest)
}
continue
}

path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
object := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.Since, groupId))
path := filepath.Dir(manifestPaths[i])
for _, groupId := range f.manifest.Groups {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
object := filepath.Join(path, fmt.Sprintf(backupNameFmt, f.manifest.Since, groupId))
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return 0, errors.Wrapf(err, "Failed to get %q", object)
Expand All @@ -302,7 +316,7 @@ func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return 0, err
}
}
since = m.Since
since = f.manifest.Since
}
return since, nil
}
Expand Down
Loading