From 1e581f1eadb700e360a17019407c1ad2ba24f782 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Tue, 19 Dec 2017 12:16:39 -0500 Subject: [PATCH 1/3] BackupController: do as much as possible When running a backup, try to do as much as possible, collecting errors along the way, and return an aggregate at the end. This way, if a backup fails for most reasons, we'll be able to upload the backup log file to object storage, which wasn't happening before. Signed-off-by: Andy Goldstein --- pkg/cloudprovider/backup_service.go | 63 ++++++++++---- pkg/cloudprovider/backup_service_test.go | 36 +++++--- pkg/controller/backup_controller.go | 101 +++++++++++------------ 3 files changed, 118 insertions(+), 82 deletions(-) diff --git a/pkg/cloudprovider/backup_service.go b/pkg/cloudprovider/backup_service.go index 1a417a4b3a..9e9e16b141 100644 --- a/pkg/cloudprovider/backup_service.go +++ b/pkg/cloudprovider/backup_service.go @@ -116,32 +116,63 @@ func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) Backup } } -func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error { - // upload metadata file - metadataKey := getMetadataKey(backupName) - if err := br.objectStore.PutObject(bucket, metadataKey, metadata); err != nil { - // failure to upload metadata file is a hard-stop - return err +func seekToBeginning(r io.Reader) error { + seeker, ok := r.(io.Seeker) + if !ok { + return nil } - // upload tar file - if err := br.objectStore.PutObject(bucket, getBackupContentsKey(backupName), backup); err != nil { - // try to delete the metadata file since the data upload failed - deleteErr := br.objectStore.DeleteObject(bucket, metadataKey) + _, err := seeker.Seek(0, 0) + return err +} + +func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error { + if file == nil { + return nil + } - return kerrors.NewAggregate([]error{err, deleteErr}) + if err := seekToBeginning(file); err != nil { + return errors.WithStack(err) } - // uploading log file is best-effort; if it fails, we log the error but call the overall upload a - // success + return br.objectStore.PutObject(bucket, key, file) +} + +func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error { + // Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the + // backup's status. logKey := getBackupLogKey(backupName) - if err := br.objectStore.PutObject(bucket, logKey, log); err != nil { + if err := br.seekAndPutObject(bucket, logKey, log); err != nil { br.logger.WithError(err).WithFields(logrus.Fields{ "bucket": bucket, "key": logKey, }).Error("Error uploading log file") } + if metadata == nil { + // If we don't have metadata, something failed, and there's no point in continuing. An object + // storage bucket that is missing the metadata file can't be restored, nor can its logs be + // viewed. + return nil + } + + // upload metadata file + metadataKey := getMetadataKey(backupName) + if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil { + // failure to upload metadata file is a hard-stop + return err + } + + if backup != nil { + // upload tar file + if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName), backup); err != nil { + // try to delete the metadata file since the data upload failed + deleteErr := br.objectStore.DeleteObject(bucket, metadataKey) + + return kerrors.NewAggregate([]error{err, deleteErr}) + } + } + return nil } @@ -173,8 +204,8 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { return output, nil } -func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) { - key := fmt.Sprintf(metadataFileFormatString, name) +func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) { + key := getMetadataKey(backupName) res, err := br.objectStore.GetObject(bucket, key) if err != nil { diff --git a/pkg/cloudprovider/backup_service_test.go b/pkg/cloudprovider/backup_service_test.go index 4aec228fdf..aa6dfd85a1 100644 --- a/pkg/cloudprovider/backup_service_test.go +++ b/pkg/cloudprovider/backup_service_test.go @@ -46,36 +46,46 @@ func TestUploadBackup(t *testing.T) { expectMetadataDelete bool backup io.ReadSeeker backupError error + expectBackupUpload bool log io.ReadSeeker logError error expectedErr string }{ { - name: "normal case", - metadata: newStringReadSeeker("foo"), - backup: newStringReadSeeker("bar"), - log: newStringReadSeeker("baz"), + name: "normal case", + metadata: newStringReadSeeker("foo"), + backup: newStringReadSeeker("bar"), + expectBackupUpload: true, + log: newStringReadSeeker("baz"), }, { - name: "error on metadata upload does not upload data or log", + name: "error on metadata upload does not upload data", metadata: newStringReadSeeker("foo"), metadataError: errors.New("md"), + log: newStringReadSeeker("baz"), expectedErr: "md", }, { name: "error on data upload deletes metadata", metadata: newStringReadSeeker("foo"), backup: newStringReadSeeker("bar"), + expectBackupUpload: true, backupError: errors.New("backup"), expectMetadataDelete: true, expectedErr: "backup", }, { - name: "error on log upload is ok", - metadata: newStringReadSeeker("foo"), - backup: newStringReadSeeker("bar"), - log: newStringReadSeeker("baz"), - logError: errors.New("log"), + name: "error on log upload is ok", + metadata: newStringReadSeeker("foo"), + backup: newStringReadSeeker("bar"), + expectBackupUpload: true, + log: newStringReadSeeker("baz"), + logError: errors.New("log"), + }, + { + name: "don't upload data when metadata is nil", + backup: newStringReadSeeker("bar"), + log: newStringReadSeeker("baz"), }, } @@ -87,11 +97,12 @@ func TestUploadBackup(t *testing.T) { backupName = "test-backup" logger = arktest.NewLogger() ) + defer objStore.AssertExpectations(t) if test.metadata != nil { objStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError) } - if test.backup != nil { + if test.backup != nil && test.expectBackupUpload { objStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError) } if test.log != nil { @@ -111,7 +122,6 @@ func TestUploadBackup(t *testing.T) { assert.NoError(t, err) } - objStore.AssertExpectations(t) }) } } @@ -372,5 +382,5 @@ func newStringReadSeeker(s string) *stringReadSeeker { } func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) { - panic("not implemented") + return 0, nil } diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 3ddc542c61..8fdf167109 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "os" "sync" @@ -32,7 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - kuberrs "k8s.io/apimachinery/pkg/util/errors" + kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -211,13 +212,14 @@ func (controller *backupController) processBackup(key string) error { return errors.Wrap(err, "error getting backup") } - // TODO I think this is now unnecessary. We only initially place - // item with Phase = ("" | New) into the queue. Items will only get - // re-queued if syncHandler returns an error, which will only - // happen if there's an error updating Phase from its initial - // state to something else. So any time it's re-queued it will - // still have its initial state, which we've already confirmed - // is ("" | New) + // Double-check we have the correct phase. In the unlikely event that multiple controller + // instances are running, it's possible for controller A to succeed in changing the phase to + // InProgress, while controller B's attempt to patch the phase fails. When controller B + // reprocesses the same backup, it will either show up as New (informer hasn't seen the update + // yet) or as InProgress. In the former case, the patch attempt will fail again, until the + // informer sees the update. In the latter case, after the informer has seen the update to + // InProgress, we still need this check so we can return nil to indicate we've finished processing + // this key (even though it was a no-op). switch backup.Status.Phase { case "", api.BackupPhaseNew: // only process new backups @@ -317,39 +319,20 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin } func (controller *backupController) runBackup(backup *api.Backup, bucket string) error { - backupFile, err := ioutil.TempFile("", "") - if err != nil { - return errors.Wrap(err, "error creating temp file for Backup") - } + log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)) + log.Info("Starting backup") logFile, err := ioutil.TempFile("", "") if err != nil { - return errors.Wrap(err, "error creating temp file for Backup log") + return errors.Wrap(err, "error creating temp file for backup log") } + defer closeAndRemoveFile(logFile, log) - defer func() { - var errs []error - // TODO should this be wrapped? - errs = append(errs, err) - - if err := backupFile.Close(); err != nil { - errs = append(errs, errors.Wrap(err, "error closing Backup temp file")) - } - - if err := os.Remove(backupFile.Name()); err != nil { - errs = append(errs, errors.Wrap(err, "error removing Backup temp file")) - } - - if err := logFile.Close(); err != nil { - errs = append(errs, errors.Wrap(err, "error closing Backup log temp file")) - } - - if err := os.Remove(logFile.Name()); err != nil { - errs = append(errs, errors.Wrap(err, "error removing Backup log temp file")) - } - - err = kuberrs.NewAggregate(errs) - }() + backupFile, err := ioutil.TempFile("", "") + if err != nil { + return errors.Wrap(err, "error creating temp file for backup") + } + defer closeAndRemoveFile(backupFile, log) actions, err := controller.pluginManager.GetBackupItemActions(backup.Name) if err != nil { @@ -357,30 +340,42 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) } defer controller.pluginManager.CloseBackupItemActions(backup.Name) - controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup") + var errs []error + var backupJsonToUpload, backupFileToUpload io.Reader + + // Do the actual backup if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil { - return err - } + errs = append(errs, err) - controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed") + backup.Status.Phase = api.BackupPhaseFailed + } else { + backup.Status.Phase = api.BackupPhaseCompleted + } - // note: updating this here so the uploaded JSON shows "completed". If - // the upload fails, we'll alter the phase in the calling func. - backup.Status.Phase = api.BackupPhaseCompleted + backupJson := new(bytes.Buffer) + if err := encode.EncodeTo(backup, "json", backupJson); err != nil { + errs = append(errs, errors.Wrap(err, "error encoding backup")) + } else { + // Only upload the json and backup tarball if encoding to json succeeded. + backupJsonToUpload = backupJson + backupFileToUpload = backupFile + } - buf := new(bytes.Buffer) - if err := encode.EncodeTo(backup, "json", buf); err != nil { - return errors.Wrap(err, "error encoding Backup") + if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil { + errs = append(errs, err) } - // re-set the files' offset to 0 for reading - if _, err = backupFile.Seek(0, 0); err != nil { - return errors.Wrap(err, "error resetting Backup file offset") + log.Info("Backup completed") + + return kerrors.NewAggregate(errs) +} + +func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) { + if err := file.Close(); err != nil { + log.WithError(err).WithField("file", file.Name()).Error("error closing file") } - if _, err = logFile.Seek(0, 0); err != nil { - return errors.Wrap(err, "error resetting Backup log file offset") + if err := os.Remove(file.Name()); err != nil { + log.WithError(err).WithField("file", file.Name()).Error("error removing file") } - - return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile) } From 0fc087c967e3332f665dfb216264a253b87594ee Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 21 Dec 2017 11:56:33 -0500 Subject: [PATCH 2/3] Flatten aggregated errors Signed-off-by: Andy Goldstein --- pkg/backup/backup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 89d0377551..3644c7f0ad 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -246,7 +246,7 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io } } - err = kuberrs.NewAggregate(errs) + err = kuberrs.Flatten(kuberrs.NewAggregate(errs)) if err == nil { log.Infof("Backup completed successfully") } else { From 1b124a314663e01108ab90d0417cea455d7ef4f6 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 3 Jan 2018 13:02:38 -0500 Subject: [PATCH 3/3] Log backup item action error when it occurs If a backup item action errors, log the error as soon as it occurs, so it's clear when the error happened. Also include information about the groupResource, namespace, and name of the item in the error. Signed-off-by: Andy Goldstein --- pkg/backup/item_backupper.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index a75573d18e..ec5015113e 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -213,7 +213,12 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()) } } else { - return errors.Wrap(err, "error executing custom action") + // We want this to show up in the log file at the place where the error occurs. When we return + // the error, it get aggregated with all the other ones at the end of the backup, making it + // harder to tell when it happened. + log.WithError(err).Error("error executing custom action") + + return errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name) } }