Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More refactoring of backup code. #3515

Merged
merged 8 commits into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
SecretKey: secretKey,
SessionToken: sessionToken,
Anonymous: anonymous,
// TODO(martinmr): Check if this field can be removed.
ForceFull: forceFull,
ForceFull: forceFull,
}

m := backup.Manifest{Groups: worker.KnownGroups()}
m.Since = req.ReadTs
glog.Infof("Created backup request: %s. Groups=%v\n", &req, m.Groups)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -103,15 +104,8 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
req := req
req.GroupId = gid
go func(req *pb.BackupRequest) {
res, err := worker.BackupGroup(ctx, req)
_, err := worker.BackupGroup(ctx, req)
errCh <- err

// Update manifest if appropriate.
m.Lock()
if res.Since > m.Since {
m.Since = res.Since
}
m.Unlock()
}(&req)
}

Expand All @@ -122,6 +116,6 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
}
}

br := &backup.Request{Backup: &req, Manifest: &m}
return br.Complete(ctx)
br := &backup.Request{Backup: &req}
return br.Complete(ctx, &m)
}
109 changes: 67 additions & 42 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"sync"

"github.com/dgraph-io/badger"
Expand All @@ -25,92 +26,116 @@ import (

// Request has all the information needed to perform a backup.
type Request struct {
DB *badger.DB // Badger pstore managed by this node.
Backup *pb.BackupRequest
Manifest *Manifest

// DB is the Badger pstore managed by this node.
DB *badger.DB
// Backup stores the backup request containing the parameters for this backup.
Backup *pb.BackupRequest
// Since indicates the beginning timestamp from which the backup should start.
// For a partial backup, the value is the largest value from the previous manifest
// files. For a full backup, Since is set to zero so that all data is included.
Since uint64
}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
Since uint64 `json:"since"`
Groups []uint32 `json:"groups"`
}

// ManifestStatus combines a manifest along with other information about it
// that should not be inside the Manifest struct since it should not be
// recorded in manifest files.
type ManifestStatus struct {
*Manifest
FileName string
}

// Process uses the request values to create a stream writer then hand off the data
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to
// collect the data and later move to the target.
// Returns errors on failure, nil on success.
func (r *Request) Process(ctx context.Context) (*pb.BackupResponse, error) {
var res pb.BackupResponse
var emptyRes pb.BackupResponse

if err := ctx.Err(); err != nil {
return nil, err
}

handler, err := r.newHandler()
uri, err := url.Parse(r.Backup.Destination)
if err != nil {
return nil, err
return &emptyRes, err
}

handler, err := r.newHandler(uri)
if err != nil {
return &emptyRes, err
}

if err := handler.CreateBackupFiles(uri, r); err != nil {
return &emptyRes, err
}

glog.V(3).Infof("Backup manifest version: %d", r.Since)

stream := r.DB.NewStreamAt(r.Backup.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
res.Since, err = stream.Backup(handler, r.Since)
newSince, err := stream.Backup(handler, r.Since)

if err != nil {
glog.Errorf("While taking backup: %v", err)
return nil, err
return &emptyRes, err
}

if newSince > r.Backup.ReadTs {
glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)",
newSince, r.Backup.ReadTs)
}
glog.V(2).Infof("Backup group %d version: %d", r.Backup.GroupId, res.Since)

glog.V(2).Infof("Backup group %d version: %d", r.Backup.GroupId, r.Backup.ReadTs)
if err = handler.Close(); err != nil {
glog.Errorf("While closing handler: %v", err)
return nil, err
return &emptyRes, err
}
glog.Infof("Backup complete: group %d at %d", r.Backup.GroupId, r.Backup.ReadTs)
return &res, nil
}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start.
// Groups are the IDs of the groups involved.
// ReadTs is the original backup request timestamp.
type Manifest struct {
sync.Mutex
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
Groups []uint32 `json:"groups"`
}

// ManifestStatus combines a manifest along with other information about it
// that should not be inside the Manifest struct since it should not be
// recorded in manifest files.
type ManifestStatus struct {
*Manifest
FileName string
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v}`,
m.Since, m.ReadTs, m.Groups)
return &emptyRes, nil
}

// Complete will finalize a backup by writing the manifest at the backup destination.
func (r *Request) Complete(ctx context.Context) error {
func (r *Request) Complete(ctx context.Context, manifest *Manifest) error {
if err := ctx.Err(); err != nil {
return err
}
handler, err := r.newHandler()

uri, err := url.Parse(r.Backup.Destination)
if err != nil {
return err
}
if r.Manifest.ReadTs == 0 {
r.Manifest.ReadTs = r.Backup.ReadTs

handler, err := r.newHandler(uri)
if err != nil {
return err
}
if err = json.NewEncoder(handler).Encode(r.Manifest); err != nil {

if err := handler.CreateManifest(uri, r, manifest); err != nil {
return err
}

if err = json.NewEncoder(handler).Encode(manifest); err != nil {
return err
}

if err = handler.Close(); err != nil {
return err
}
glog.Infof("Backup completed OK.")
return nil
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups)
}
85 changes: 47 additions & 38 deletions ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,60 +43,69 @@ func (h *fileHandler) readManifest(path string, m *Manifest) error {
return json.Unmarshal(b, m)
}

// Create prepares the a path to save backup files.
// Returns error on failure, nil on success.
func (h *fileHandler) Create(uri *url.URL, req *Request) error {
var dir, path, fileName string
func (h *fileHandler) createFiles(uri *url.URL, req *Request, fileName string) error {
var dir, path string

dir = filepath.Join(uri.Path, fmt.Sprintf(backupPathFmt, req.Backup.UnixTs))
err := os.Mkdir(dir, 0700)
if err != nil && !os.IsExist(err) {
return err
}

path = filepath.Join(dir, fileName)
h.fp, err = os.Create(path)
if err != nil {
return err
}
glog.V(2).Infof("Using file path: %q", path)
return nil
}

// CreateBackupFiles prepares the a path to save backup files and computes the timestamp
// from which to start the backup.
func (h *fileHandler) CreateBackupFiles(uri *url.URL, req *Request) error {
var fileName string

if !pathExist(uri.Path) {
return errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

// Find the max Since value from the latest backup. This is done only when starting
// a new backup, not when creating a manifest.
if req.Manifest == nil {
var lastManifest string
suffix := filepath.Join(string(filepath.Separator), backupManifest)
_ = x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
if !isdir && strings.HasSuffix(path, suffix) && path > lastManifest {
lastManifest = path
}
return false
})

if lastManifest != "" {
var m Manifest
if err := h.readManifest(lastManifest, &m); err != nil {
return err
}
// Find the max Since value from the latest backup.
var lastManifest string
suffix := filepath.Join(string(filepath.Separator), backupManifest)
_ = x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
if !isdir && strings.HasSuffix(path, suffix) && path > lastManifest {
lastManifest = path
}
return false
})

req.Since = m.Since
if lastManifest != "" {
var m Manifest
if err := h.readManifest(lastManifest, &m); err != nil {
return err
}
fileName = fmt.Sprintf(backupNameFmt, req.Backup.ReadTs, req.Backup.GroupId)
} else {
fileName = backupManifest

req.Since = m.Since
}
fileName = fmt.Sprintf(backupNameFmt, req.Backup.ReadTs, req.Backup.GroupId)

// If a full backup is being forced, force Since to zero to stream all
// the contents from the database.
if req.Backup.ForceFull {
req.Since = 0
}

dir = filepath.Join(uri.Path, fmt.Sprintf(backupPathFmt, req.Backup.UnixTs))
err := os.Mkdir(dir, 0700)
if err != nil && !os.IsExist(err) {
return err
}
return h.createFiles(uri, req, fileName)
}

path = filepath.Join(dir, fileName)
h.fp, err = os.Create(path)
if err != nil {
return err
// CreateManifest creates the backup manifest file.
func (h *fileHandler) CreateManifest(uri *url.URL, req *Request, manifest *Manifest) error {
if !pathExist(uri.Path) {
return errors.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}
glog.V(2).Infof("Using file path: %q", path)

return nil
return h.createFiles(uri, req, backupManifest)
}

// Load uses tries to load any backup files found.
Expand Down Expand Up @@ -127,7 +136,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
if err := h.readManifest(manifest, &m); err != nil {
return 0, errors.Wrapf(err, "While reading %q", manifest)
}
if m.ReadTs == 0 || m.Since == 0 || len(m.Groups) == 0 {
if m.Since == 0 || m.Since == 0 || len(m.Groups) == 0 {
martinmr marked this conversation as resolved.
Show resolved Hide resolved
if glog.V(2) {
fmt.Printf("Restore: skip backup: %s: %#v\n", manifest, &m)
}
Expand All @@ -136,7 +145,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {

path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.ReadTs, groupId))
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.Since, groupId))
fp, err := os.Open(file)
if err != nil {
return 0, errors.Wrapf(err, "Failed to open %q", file)
Expand Down
Loading