Skip to content

Commit

Permalink
More refactoring of backup code. (#3515)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Jun 7, 2019
1 parent 9cac881 commit 381e4d7
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 556 deletions.
45 changes: 28 additions & 17 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package alpha
import (
"context"
"net/http"
"net/url"
"time"

"github.com/dgraph-io/dgraph/ee/backup"
Expand Down Expand Up @@ -89,39 +90,49 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
SecretKey: secretKey,
SessionToken: sessionToken,
Anonymous: anonymous,
// TODO(martinmr): Check if this field can be removed.
ForceFull: forceFull,
}
m := backup.Manifest{Groups: worker.KnownGroups()}
glog.Infof("Created backup request: %s. Groups=%v\n", &req, m.Groups)

// Read the manifests to get the right timestamp from which to start the backup.
uri, err := url.Parse(req.Destination)
if err != nil {
return err
}
handler, err := backup.NewUriHandler(uri)
if err != nil {
return err
}
req.SinceTs, err = handler.GetSinceTs(uri)
if err != nil {
return err
}
if forceFull {
req.SinceTs = 0
}

groups := worker.KnownGroups()
glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, len(m.Groups))
for _, gid := range m.Groups {
errCh := make(chan error, len(groups))
for _, gid := range groups {
req := req
req.GroupId = gid
go func(req *pb.BackupRequest) {
res, err := worker.BackupGroup(ctx, req)
_, err := worker.BackupGroup(ctx, req)
errCh <- err

// Update manifest if appropriate.
m.Lock()
if res.Since > m.Since {
m.Since = res.Since
}
m.Unlock()
}(&req)
}

for range m.Groups {
for range groups {
if err := <-errCh; err != nil {
glog.Errorf("Error received during backup: %v", err)
return err
}
}

br := &backup.Request{Backup: &req, Manifest: &m}
return br.Complete(ctx)
m := backup.Manifest{Groups: groups}
m.Since = req.ReadTs
bp := &backup.Processor{Request: &req}
return bp.CompleteBackup(ctx, &m)
}
119 changes: 66 additions & 53 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,101 +16,114 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"sync"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/golang/glog"
)

// Request has all the information needed to perform a backup.
type Request struct {
DB *badger.DB // Badger pstore managed by this node.
Backup *pb.BackupRequest
Manifest *Manifest
// Processor handles the different stages of the backup process.
type Processor struct {
// DB is the Badger pstore managed by this node.
DB *badger.DB
// Request stores the backup request containing the parameters for this backup.
Request *pb.BackupRequest
}

// Since indicates the beginning timestamp from which the backup should start.
// For a partial backup, the value is the largest value from the previous manifest
// files. For a full backup, Since is set to zero so that all data is included.
Since uint64
// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
Since uint64 `json:"since"`
Groups []uint32 `json:"groups"`
}

// Process uses the request values to create a stream writer then hand off the data
// WriteBackup uses the request values to create a stream writer then hand off the data
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to
// collect the data and later move to the target.
// Returns errors on failure, nil on success.
func (r *Request) Process(ctx context.Context) (*pb.BackupResponse, error) {
var res pb.BackupResponse
func (pr *Processor) WriteBackup(ctx context.Context) (*pb.BackupResponse, error) {
var emptyRes pb.BackupResponse

if err := ctx.Err(); err != nil {
return nil, err
}

handler, err := r.newHandler()
uri, err := url.Parse(pr.Request.Destination)
if err != nil {
return nil, err
return &emptyRes, err
}

handler, err := NewUriHandler(uri)
if err != nil {
return &emptyRes, err
}

if err := handler.CreateBackupFile(uri, pr.Request); err != nil {
return &emptyRes, err
}
glog.V(3).Infof("Backup manifest version: %d", r.Since)

stream := r.DB.NewStreamAt(r.Backup.ReadTs)
glog.V(3).Infof("Backup manifest version: %d", pr.Request.SinceTs)

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
res.Since, err = stream.Backup(handler, r.Since)
newSince, err := stream.Backup(handler, pr.Request.SinceTs)

if err != nil {
glog.Errorf("While taking backup: %v", err)
return nil, err
return &emptyRes, err
}

if newSince > pr.Request.ReadTs {
glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)",
newSince, pr.Request.ReadTs)
}
glog.V(2).Infof("Backup group %d version: %d", r.Backup.GroupId, res.Since)

glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs)
if err = handler.Close(); err != nil {
glog.Errorf("While closing handler: %v", err)
return nil, err
return &emptyRes, err
}
glog.Infof("Backup complete: group %d at %d", r.Backup.GroupId, r.Backup.ReadTs)
return &res, nil
glog.Infof("Backup complete: group %d at %d", pr.Request.GroupId, pr.Request.ReadTs)
return &emptyRes, nil
}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start.
// Groups are the IDs of the groups involved.
// ReadTs is the original backup request timestamp.
type Manifest struct {
sync.Mutex
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
Groups []uint32 `json:"groups"`
}

// ManifestStatus combines a manifest along with other information about it
// that should not be inside the Manifest struct since it should not be
// recorded in manifest files.
type ManifestStatus struct {
*Manifest
FileName string
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v}`,
m.Since, m.ReadTs, m.Groups)
}

// Complete will finalize a backup by writing the manifest at the backup destination.
func (r *Request) Complete(ctx context.Context) error {
// CompleteBackup will finalize a backup by writing the manifest at the backup destination.
func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) error {
if err := ctx.Err(); err != nil {
return err
}
handler, err := r.newHandler()

uri, err := url.Parse(pr.Request.Destination)
if err != nil {
return err
}
if r.Manifest.ReadTs == 0 {
r.Manifest.ReadTs = r.Backup.ReadTs

handler, err := NewUriHandler(uri)
if err != nil {
return err
}

if err := handler.CreateManifest(uri, pr.Request); err != nil {
return err
}
if err = json.NewEncoder(handler).Encode(r.Manifest); err != nil {

if err = json.NewEncoder(handler).Encode(manifest); err != nil {
return err
}

if err = handler.Close(); err != nil {
return err
}
glog.Infof("Backup completed OK.")
return nil
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups)
}
94 changes: 52 additions & 42 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"
"strings"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
Expand All @@ -43,60 +44,69 @@ func (h *fileHandler) readManifest(path string, m *Manifest) error {
return json.Unmarshal(b, m)
}

// Create prepares the a path to save backup files.
// Returns error on failure, nil on success.
func (h *fileHandler) Create(uri *url.URL, req *Request) error {
var dir, path, fileName string
func (h *fileHandler) createFiles(uri *url.URL, req *pb.BackupRequest, fileName string) error {
var dir, path string

if !pathExist(uri.Path) {
return errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
dir = filepath.Join(uri.Path, fmt.Sprintf(backupPathFmt, req.UnixTs))
err := os.Mkdir(dir, 0700)
if err != nil && !os.IsExist(err) {
return err
}

// Find the max Since value from the latest backup. This is done only when starting
// a new backup, not when creating a manifest.
if req.Manifest == nil {
var lastManifest string
suffix := filepath.Join(string(filepath.Separator), backupManifest)
_ = x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
if !isdir && strings.HasSuffix(path, suffix) && path > lastManifest {
lastManifest = path
}
return false
})
path = filepath.Join(dir, fileName)
h.fp, err = os.Create(path)
if err != nil {
return err
}
glog.V(2).Infof("Using file path: %q", path)
return nil
}

if lastManifest != "" {
var m Manifest
if err := h.readManifest(lastManifest, &m); err != nil {
return err
}
// GetSinceTs reads the manifests at the given URL and returns the appropriate
// timestamp from which the current backup should be started.
func (h *fileHandler) GetSinceTs(uri *url.URL) (uint64, error) {
if !pathExist(uri.Path) {
return 0, errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

req.Since = m.Since
// Find the max Since value from the latest backup.
var lastManifest string
suffix := filepath.Join(string(filepath.Separator), backupManifest)
_ = x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
if !isdir && strings.HasSuffix(path, suffix) && path > lastManifest {
lastManifest = path
}
fileName = fmt.Sprintf(backupNameFmt, req.Backup.ReadTs, req.Backup.GroupId)
} else {
fileName = backupManifest
return false
})

if lastManifest == "" {
return 0, nil
}

// If a full backup is being forced, force Since to zero to stream all
// the contents from the database.
if req.Backup.ForceFull {
req.Since = 0
var m Manifest
if err := h.readManifest(lastManifest, &m); err != nil {
return 0, err
}
return m.Since, nil
}

dir = filepath.Join(uri.Path, fmt.Sprintf(backupPathFmt, req.Backup.UnixTs))
err := os.Mkdir(dir, 0700)
if err != nil && !os.IsExist(err) {
return err
// CreateBackupFile prepares the a path to save the backup file.
func (h *fileHandler) CreateBackupFile(uri *url.URL, req *pb.BackupRequest) error {
if !pathExist(uri.Path) {
return errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

path = filepath.Join(dir, fileName)
h.fp, err = os.Create(path)
if err != nil {
return err
fileName := fmt.Sprintf(backupNameFmt, req.ReadTs, req.GroupId)
return h.createFiles(uri, req, fileName)
}

// CreateManifest completes the backup by writing the manifest to a file.
func (h *fileHandler) CreateManifest(uri *url.URL, req *pb.BackupRequest) error {
if !pathExist(uri.Path) {
return errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}
glog.V(2).Infof("Using file path: %q", path)

return nil
return h.createFiles(uri, req, backupManifest)
}

// Load uses tries to load any backup files found.
Expand Down Expand Up @@ -127,7 +137,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
if err := h.readManifest(manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
}
if m.ReadTs == 0 || m.Since == 0 || len(m.Groups) == 0 {
if m.Since == 0 || len(m.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifest, &m)
}
Expand All @@ -136,7 +146,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {

path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.ReadTs, groupId))
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.Since, groupId))
fp, err := os.Open(file)
if err != nil {
return 0, errors.Wrapf(err, "Failed to open %q", file)
Expand Down
Loading

0 comments on commit 381e4d7

Please sign in to comment.