Skip to content

Commit

Permalink
Allow partial restores and restoring different backup series. (hyperm…
Browse files Browse the repository at this point in the history
…odeinc#3547)

Add a last_dir flag to the restore command that indicates to the restore
that any backup directories created after should be ignored. This
enables partial restores and restores of different backup series in
backup directories that include multiple series of full and incremental
backups.
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent f2e06ea commit 0d299be
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 96 deletions.
1 change: 1 addition & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Manifest struct {
Type string `json:"type"` // Either full or incremental.
Since uint64 `json:"since"`
Groups []uint32 `json:"groups"`
Path string `json:"-"`
}

// WriteBackup uses the request values to create a stream writer then hand off the data
Expand Down
25 changes: 13 additions & 12 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,34 +111,35 @@ func (h *fileHandler) CreateManifest(uri *url.URL, req *pb.BackupRequest) error

// Load uses tries to load any backup files found.
// Returns the maximum value of Since on success, error otherwise.
func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
func (h *fileHandler) Load(uri *url.URL, lastDir string, fn loadFn) (uint64, error) {
if !pathExist(uri.Path) {
return 0, errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

suffix := filepath.Join(string(filepath.Separator), backupManifest)
manifestPaths := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
paths := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, suffix)
})
if len(manifestPaths) == 0 {
if len(paths) == 0 {
return 0, errors.Errorf("No manifests found at path: %s", uri.Path)
}
sort.Strings(manifestPaths)
sort.Strings(paths)
if glog.V(3) {
fmt.Printf("Found backup manifest(s): %v\n", manifestPaths)
fmt.Printf("Found backup manifest(s): %v\n", paths)
}

// Read and filter the manifests to get the list of manifests to consider
// Read and filter the files to get the list of files to consider
// for this restore operation.
var manifests []*Manifest
for _, manifest := range manifestPaths {
for _, path := range paths {
var m Manifest
if err := h.readManifest(manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
if err := h.readManifest(path, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", path)
}
m.Path = path
manifests = append(manifests, &m)
}
manifests, manifestPaths, err := filterManifests(manifests, manifestPaths)
manifests, err := filterManifests(manifests, lastDir)
if err != nil {
return 0, err
}
Expand All @@ -150,12 +151,12 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
for i, manifest := range manifests {
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifestPaths[i], manifest)
fmt.Printf("Restore: skip backup: %#v\n", manifest)
}
continue
}

path := filepath.Dir(manifestPaths[i])
path := filepath.Dir(manifests[i].Path)
for _, groupId := range manifest.Groups {
file := filepath.Join(path, backupName(manifest.Since, groupId))
fp, err := os.Open(file)
Expand Down
36 changes: 20 additions & 16 deletions ee/backup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"io"
"net/url"
"strings"

"github.com/dgraph-io/dgraph/protos/pb"

Expand Down Expand Up @@ -70,9 +71,11 @@ type UriHandler interface {
CreateManifest(*url.URL, *pb.BackupRequest) error

// Load will scan location URI for backup files, then load them via loadFn.
// It optionally takes the name of the last directory to consider. Any backup directories
// created after will be ignored.
// Objects implementing this function will be used for retrieving (dowload) backup files
// and loading the data into a DB. The restore CLI command uses this call.
Load(*url.URL, loadFn) (uint64, error)
Load(*url.URL, string, loadFn) (uint64, error)

// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
Expand Down Expand Up @@ -130,10 +133,11 @@ func NewUriHandler(uri *url.URL) (UriHandler, error) {
// A reader and the backup groupId are passed as arguments.
type loadFn func(reader io.Reader, groupId int) error

// Load will scan location l for backup files, then load them sequentially through reader.
// Load will scan location l for backup files (not including any directories
// created after lastDir), then load them sequentially through reader.
// Returns the maximum Since value on success, otherwise an error.
func Load(l string, fn loadFn) (since uint64, err error) {
uri, err := url.Parse(l)
func Load(location, lastDir string, fn loadFn) (since uint64, err error) {
uri, err := url.Parse(location)
if err != nil {
return 0, err
}
Expand All @@ -143,7 +147,7 @@ func Load(l string, fn loadFn) (since uint64, err error) {
return 0, errors.Errorf("Unsupported URI: %v", uri)
}

return h.Load(uri, fn)
return h.Load(uri, lastDir, fn)
}

// ListManifests scans location l for backup files and returns the list of manifests.
Expand Down Expand Up @@ -175,19 +179,20 @@ func ListManifests(l string) (map[string]*Manifest, error) {
return listedManifests, nil
}

// filterManifests takes a list of manifests, their paths, and returns the list of manifests
// filterManifests takes a list of manifests and returns the list of manifests
// that should be considered during a restore.
func filterManifests(manifests []*Manifest, paths []string) ([]*Manifest, []string, error) {
if len(manifests) != len(paths) {
return nil, nil, errors.Errorf("lengths of manifest and paths slice differ")
}

// Go through the manifests in reverse order and stop when the latest full backup is found.
func filterManifests(manifests []*Manifest, lastDir string) ([]*Manifest, error) {
// Go through the files in reverse order and stop when the latest full backup is found.
var filteredManifests []*Manifest
var filteredPaths []string
for i := len(manifests) - 1; i >= 0; i-- {
parts := strings.Split(manifests[i].Path, "/")
dir := parts[len(parts)-2]
if len(lastDir) > 0 && dir > lastDir {
fmt.Printf("Restore: skip directory %s because it's newer than %s.\n", dir, lastDir)
continue
}

filteredManifests = append(filteredManifests, manifests[i])
filteredPaths = append(filteredPaths, paths[i])
if manifests[i].Type == "full" {
break
}
Expand All @@ -197,10 +202,9 @@ func filterManifests(manifests []*Manifest, paths []string) ([]*Manifest, []stri
for i := len(filteredManifests)/2 - 1; i >= 0; i-- {
opp := len(filteredManifests) - 1 - i
filteredManifests[i], filteredManifests[opp] = filteredManifests[opp], filteredManifests[i]
filteredPaths[i], filteredPaths[opp] = filteredPaths[opp], filteredPaths[i]
}

return filteredManifests, filteredPaths, nil
return filteredManifests, nil
}

func backupName(since uint64, groupId uint32) string {
Expand Down
28 changes: 12 additions & 16 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc"
Expand All @@ -39,7 +38,7 @@ var Restore x.SubCommand
var LsBackup x.SubCommand

var opt struct {
location, pdir, zero string
lastDir, location, pdir, zero string
}

func init() {
Expand Down Expand Up @@ -109,6 +108,10 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
flag.StringVarP(&opt.pdir, "postings", "p", "",
"Directory where posting lists are stored (required).")
flag.StringVarP(&opt.zero, "zero", "z", "", "gRPC address for Dgraph zero. ex: localhost:5080")
flag.StringVarP(&opt.lastDir, "last_dir", "", "", "The latest backup folder to consider during "+
"a restore operation. Useful for partial backups or to backup a previous series of "+
"backups. Only the name of the directory (e.g dgraph.1234567890) is required. An empty "+
"value is equivalent to setting it to the latest backup directory.")
_ = Restore.Cmd.MarkFlagRequired("postings")
_ = Restore.Cmd.MarkFlagRequired("location")
}
Expand Down Expand Up @@ -189,16 +192,14 @@ func runRestoreCmd() error {
}

start = time.Now()
version, err := RunRestore(opt.pdir, opt.location)
version, err := RunRestore(opt.pdir, opt.location, opt.lastDir)
if err != nil {
return err
}
if version == 0 {
return errors.Errorf("Failed to obtain a restore version")
}
if glog.V(2) {
fmt.Printf("Restore version: %d\n", version)
}
fmt.Printf("Restore version: %d\n", version)

if zc != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand All @@ -215,19 +216,16 @@ func runRestoreCmd() error {
}

// RunRestore calls badger.Load and tries to load data into a new DB.
func RunRestore(pdir, location string) (uint64, error) {
func RunRestore(pdir, location, lastDir string) (uint64, error) {
bo := badger.DefaultOptions
bo.SyncWrites = true
bo.TableLoadingMode = options.MemoryMap
bo.ValueThreshold = 1 << 10
bo.NumVersionsToKeep = math.MaxInt32
if !glog.V(2) {
bo.Logger = nil
}

// Scan location for backup files and load them. Each file represents a node group,
// and we create a new p dir for each.
return Load(location, func(r io.Reader, groupId int) error {
return Load(location, lastDir, func(r io.Reader, groupId int) error {
bo := bo
bo.Dir = filepath.Join(pdir, fmt.Sprintf("p%d", groupId))
bo.ValueDir = bo.Dir
Expand All @@ -236,11 +234,9 @@ func RunRestore(pdir, location string) (uint64, error) {
return err
}
defer db.Close()
if glog.V(2) {
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(bo.Dir) {
fmt.Println("Creating new db:", bo.Dir)
}
fmt.Printf("Restoring groupId: %d\n", groupId)
if !pathExist(bo.Dir) {
fmt.Println("Creating new db:", bo.Dir)
}
gzReader, err := gzip.NewReader(r)
if err != nil {
Expand Down
37 changes: 19 additions & 18 deletions ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,29 +239,29 @@ func (h *s3Handler) readManifest(mc *minio.Client, object string, m *Manifest) e
// Load creates a new session, scans for backup objects in a bucket, then tries to
// load any backup objects found.
// Returns nil and the maximum Since value on success, error otherwise.
func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
func (h *s3Handler) Load(uri *url.URL, lastDir string, fn loadFn) (uint64, error) {
mc, err := h.setup(uri)
if err != nil {
return 0, err
}

var manifestPaths []string
var paths []string

doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifestPaths = append(manifestPaths, object.Key)
paths = append(paths, object.Key)
}
}
if len(manifestPaths) == 0 {
if len(paths) == 0 {
return 0, errors.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(manifestPaths)
sort.Strings(paths)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifestPaths)
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, paths)
}

// since is returned with the max manifest Since value found.
Expand All @@ -270,32 +270,33 @@ func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
// Read and filter the manifests to get the list of manifests to consider
// for this restore operation.
var manifests []*Manifest
for _, manifest := range manifestPaths {
for _, path := range paths {
var m Manifest
if err := h.readManifest(mc, manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
if err := h.readManifest(mc, path, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", path)
}
m.Path = path
manifests = append(manifests, &m)
}
manifests, manifestPaths, err = filterManifests(manifests, manifestPaths)
manifests, err = filterManifests(manifests, lastDir)
if err != nil {
return 0, err
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file,
// backup manifests for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
for i, m := range manifests {
if m.Since == 0 || len(m.Groups) == 0 {
for i, manifest := range manifests {
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifestPaths[i], m)
fmt.Printf("Restore: skip backup: %#v\n", manifest)
}
continue
}

path := filepath.Dir(manifestPaths[i])
for _, groupId := range m.Groups {
object := filepath.Join(path, backupName(m.Since, groupId))
path := filepath.Dir(manifests[i].Path)
for _, groupId := range manifest.Groups {
object := filepath.Join(path, backupName(manifest.Since, groupId))
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return 0, errors.Wrapf(err, "Failed to get %q", object)
Expand All @@ -313,7 +314,7 @@ func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return 0, err
}
}
since = m.Since
since = manifest.Since
}
return since, nil
}
Expand Down
Loading

0 comments on commit 0d299be

Please sign in to comment.