Skip to content

Commit

Permalink
Merge pull request #250 from ncdc/backup-controller-do-as-much-as-pos…
Browse files Browse the repository at this point in the history
…sible

BackupController: do as much as possible
  • Loading branch information
skriss committed Jan 3, 2018
2 parents 6b0b637 + 1b124a3 commit 656428d
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io
}
}

err = kuberrs.NewAggregate(errs)
err = kuberrs.Flatten(kuberrs.NewAggregate(errs))
if err == nil {
log.Infof("Backup completed successfully")
} else {
Expand Down
7 changes: 6 additions & 1 deletion pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource())
}
} else {
return errors.Wrap(err, "error executing custom action")
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
// harder to tell when it happened.
log.WithError(err).Error("error executing custom action")

return errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
}

Expand Down
63 changes: 47 additions & 16 deletions pkg/cloudprovider/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,63 @@ func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) Backup
}
}

func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
// upload metadata file
metadataKey := getMetadataKey(backupName)
if err := br.objectStore.PutObject(bucket, metadataKey, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {
return nil
}

// upload tar file
if err := br.objectStore.PutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
_, err := seeker.Seek(0, 0)
return err
}

func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error {
if file == nil {
return nil
}

return kerrors.NewAggregate([]error{err, deleteErr})
if err := seekToBeginning(file); err != nil {
return errors.WithStack(err)
}

// uploading log file is best-effort; if it fails, we log the error but call the overall upload a
// success
return br.objectStore.PutObject(bucket, key, file)
}

func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
// backup's status.
logKey := getBackupLogKey(backupName)
if err := br.objectStore.PutObject(bucket, logKey, log); err != nil {
if err := br.seekAndPutObject(bucket, logKey, log); err != nil {
br.logger.WithError(err).WithFields(logrus.Fields{
"bucket": bucket,
"key": logKey,
}).Error("Error uploading log file")
}

if metadata == nil {
// If we don't have metadata, something failed, and there's no point in continuing. An object
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
// viewed.
return nil
}

// upload metadata file
metadataKey := getMetadataKey(backupName)
if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
}

if backup != nil {
// upload tar file
if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)

return kerrors.NewAggregate([]error{err, deleteErr})
}
}

return nil
}

Expand Down Expand Up @@ -173,8 +204,8 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
return output, nil
}

func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) {
key := fmt.Sprintf(metadataFileFormatString, name)
func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) {
key := getMetadataKey(backupName)

res, err := br.objectStore.GetObject(bucket, key)
if err != nil {
Expand Down
36 changes: 23 additions & 13 deletions pkg/cloudprovider/backup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,46 @@ func TestUploadBackup(t *testing.T) {
expectMetadataDelete bool
backup io.ReadSeeker
backupError error
expectBackupUpload bool
log io.ReadSeeker
logError error
expectedErr string
}{
{
name: "normal case",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
name: "normal case",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
},
{
name: "error on metadata upload does not upload data or log",
name: "error on metadata upload does not upload data",
metadata: newStringReadSeeker("foo"),
metadataError: errors.New("md"),
log: newStringReadSeeker("baz"),
expectedErr: "md",
},
{
name: "error on data upload deletes metadata",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
backupError: errors.New("backup"),
expectMetadataDelete: true,
expectedErr: "backup",
},
{
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
logError: errors.New("log"),
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
logError: errors.New("log"),
},
{
name: "don't upload data when metadata is nil",
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
},
}

Expand All @@ -87,11 +97,12 @@ func TestUploadBackup(t *testing.T) {
backupName = "test-backup"
logger = arktest.NewLogger()
)
defer objStore.AssertExpectations(t)

if test.metadata != nil {
objStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError)
}
if test.backup != nil {
if test.backup != nil && test.expectBackupUpload {
objStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError)
}
if test.log != nil {
Expand All @@ -111,7 +122,6 @@ func TestUploadBackup(t *testing.T) {
assert.NoError(t, err)
}

objStore.AssertExpectations(t)
})
}
}
Expand Down Expand Up @@ -372,5 +382,5 @@ func newStringReadSeeker(s string) *stringReadSeeker {
}

func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) {
panic("not implemented")
return 0, nil
}
101 changes: 48 additions & 53 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
Expand All @@ -32,7 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -211,13 +212,14 @@ func (controller *backupController) processBackup(key string) error {
return errors.Wrap(err, "error getting backup")
}

// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
// Double-check we have the correct phase. In the unlikely event that multiple controller
// instances are running, it's possible for controller A to succeed in changing the phase to
// InProgress, while controller B's attempt to patch the phase fails. When controller B
// reprocesses the same backup, it will either show up as New (informer hasn't seen the update
// yet) or as InProgress. In the former case, the patch attempt will fail again, until the
// informer sees the update. In the latter case, after the informer has seen the update to
// InProgress, we still need this check so we can return nil to indicate we've finished processing
// this key (even though it was a no-op).
switch backup.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
Expand Down Expand Up @@ -322,70 +324,63 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin
}

func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup")
}
log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")

logFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup log")
return errors.Wrap(err, "error creating temp file for backup log")
}
defer closeAndRemoveFile(logFile, log)

defer func() {
var errs []error
// TODO should this be wrapped?
errs = append(errs, err)

if err := backupFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup temp file"))
}

if err := os.Remove(backupFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup temp file"))
}

if err := logFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup log temp file"))
}

if err := os.Remove(logFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup log temp file"))
}

err = kuberrs.NewAggregate(errs)
}()
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for backup")
}
defer closeAndRemoveFile(backupFile, log)

actions, err := controller.pluginManager.GetBackupItemActions(backup.Name)
if err != nil {
return err
}
defer controller.pluginManager.CloseBackupItemActions(backup.Name)

controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup")
var errs []error

var backupJsonToUpload, backupFileToUpload io.Reader

// Do the actual backup
if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil {
return err
}
errs = append(errs, err)

controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed")
backup.Status.Phase = api.BackupPhaseFailed
} else {
backup.Status.Phase = api.BackupPhaseCompleted
}

// note: updating this here so the uploaded JSON shows "completed". If
// the upload fails, we'll alter the phase in the calling func.
backup.Status.Phase = api.BackupPhaseCompleted
backupJson := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", backupJson); err != nil {
errs = append(errs, errors.Wrap(err, "error encoding backup"))
} else {
// Only upload the json and backup tarball if encoding to json succeeded.
backupJsonToUpload = backupJson
backupFileToUpload = backupFile
}

buf := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", buf); err != nil {
return errors.Wrap(err, "error encoding Backup")
if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil {
errs = append(errs, err)
}

// re-set the files' offset to 0 for reading
if _, err = backupFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup file offset")
log.Info("Backup completed")

return kerrors.NewAggregate(errs)
}

func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error closing file")
}
if _, err = logFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup log file offset")
if err := os.Remove(file.Name()); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error removing file")
}

return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile)
}

0 comments on commit 656428d

Please sign in to comment.