diff --git a/go/vt/mysqlctl/azblobbackupstorage/azblob.go b/go/vt/mysqlctl/azblobbackupstorage/azblob.go index a9346506e82..19a8ab331a0 100644 --- a/go/vt/mysqlctl/azblobbackupstorage/azblob.go +++ b/go/vt/mysqlctl/azblobbackupstorage/azblob.go @@ -127,6 +127,21 @@ func (bh *AZBlobBackupHandle) Name() string { return bh.name } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *AZBlobBackupHandle) Error() error { + return bh.errors.Error() +} + // AddFile implements BackupHandle. func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { @@ -156,7 +171,7 @@ func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, file }) if err != nil { reader.CloseWithError(err) - bh.errors.RecordError(err) + bh.RecordError(err) } }() @@ -169,7 +184,7 @@ func (bh *AZBlobBackupHandle) EndBackup(ctx context.Context) error { return fmt.Errorf("EndBackup cannot be called on read-only backup") } bh.waitGroup.Wait() - return bh.errors.Error() + return bh.Error() } // AbortBackup implements BackupHandle. diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index e4c2e6bc18d..78720a12e08 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -24,6 +24,7 @@ import ( "io" "golang.org/x/net/context" + "vitess.io/vitess/go/vt/concurrency" ) var ( @@ -74,6 +75,10 @@ type BackupHandle interface { // The context is valid for the duration of the reads, until the // ReadCloser is closed. ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) + + // concurrency.ErrorRecorder is embedded here to coordinate reporting and + // handling of errors among all the components involved in taking a backup. + concurrency.ErrorRecorder } // BackupStorage is the interface to the storage system diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f49f8fdf780..25c8b87a768 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -276,7 +276,6 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar // Backup with the provided concurrency. sema := sync2.NewSemaphore(params.Concurrency, 0) - rec := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} for i := range fes { wg.Add(1) @@ -287,19 +286,28 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar // encountered an error. sema.Acquire() defer sema.Release() - if rec.HasErrors() { + if bh.HasErrors() { return } // Backup the individual file. name := fmt.Sprintf("%v", i) - rec.RecordError(be.backupFile(ctx, params, bh, &fes[i], name)) + bh.RecordError(be.backupFile(ctx, params, bh, &fes[i], name)) }(i) } wg.Wait() - if rec.HasErrors() { - return rec.Error() + + // BackupHandle supports the ErrorRecorder interface for tracking errors + // across any goroutines that fan out to take the backup. This means that we + // don't need a local error recorder and can put everything through the bh. + // + // This handles the scenario where bh.AddFile() encounters an error asynchronously, + // which ordinarily would be lost in the context of `be.backupFile`, i.e. if an + // error were encountered + // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). + if bh.HasErrors() { + return bh.Error() } // open the MANIFEST diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go index 80c37ceb828..085ecd94828 100644 --- a/go/vt/mysqlctl/cephbackupstorage/ceph.go +++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go @@ -62,6 +62,21 @@ type CephBackupHandle struct { waitGroup sync.WaitGroup } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *CephBackupHandle) Error() error { + return bh.errors.Error() +} + // Directory implements BackupHandle. func (bh *CephBackupHandle) Directory() string { return bh.dir @@ -94,7 +109,7 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi // Signal the writer that an error occurred, in case it's not done writing yet. reader.CloseWithError(err) // In case the error happened after the writer finished, we need to remember it. - bh.errors.RecordError(err) + bh.RecordError(err) } }() // Give our caller the write end of the pipe. @@ -108,7 +123,7 @@ func (bh *CephBackupHandle) EndBackup(ctx context.Context) error { } bh.waitGroup.Wait() // Return the saved PutObject() errors, if any. - return bh.errors.Error() + return bh.Error() } // AbortBackup implements BackupHandle. diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go index c99a0ee6b0a..be5810c1a94 100644 --- a/go/vt/mysqlctl/filebackupstorage/file.go +++ b/go/vt/mysqlctl/filebackupstorage/file.go @@ -28,6 +28,7 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" ) @@ -43,6 +44,22 @@ type FileBackupHandle struct { dir string name string readOnly bool + errors concurrency.AllErrorRecorder +} + +// RecordError is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) RecordError(err error) { + fbh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) HasErrors() bool { + return fbh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (fbh *FileBackupHandle) Error() error { + return fbh.errors.Error() } // Directory is part of the BackupHandle interface diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go index 5473be3217a..7ca9f4796d6 100644 --- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go +++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go @@ -33,6 +33,7 @@ import ( "google.golang.org/api/option" "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" ) @@ -51,6 +52,22 @@ type GCSBackupHandle struct { dir string name string readOnly bool + errors concurrency.AllErrorRecorder +} + +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *GCSBackupHandle) Error() error { + return bh.errors.Error() } // Directory implements BackupHandle. diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 55fe11f6942..e01c8c678a5 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -41,6 +41,7 @@ import ( "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" "golang.org/x/net/context" @@ -85,7 +86,7 @@ var logNameMap logNameToLogLevel // S3BackupHandle implements the backupstorage.BackupHandle interface. type S3BackupHandle struct { - client *s3.S3 + client s3iface.S3API bs *S3BackupStorage dir string name string @@ -104,6 +105,21 @@ func (bh *S3BackupHandle) Name() string { return bh.name } +// RecordError is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) RecordError(err error) { + bh.errors.RecordError(err) +} + +// HasErrors is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) HasErrors() bool { + return bh.errors.HasErrors() +} + +// Error is part of the concurrency.ErrorRecorder interface. +func (bh *S3BackupHandle) Error() error { + return bh.errors.Error() +} + // AddFile is part of the backupstorage.BackupHandle interface. func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { @@ -143,7 +159,7 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize }) if err != nil { reader.CloseWithError(err) - bh.errors.RecordError(err) + bh.RecordError(err) } }() @@ -156,7 +172,7 @@ func (bh *S3BackupHandle) EndBackup(ctx context.Context) error { return fmt.Errorf("EndBackup cannot be called on read-only backup") } bh.waitGroup.Wait() - return bh.errors.Error() + return bh.Error() } // AbortBackup is part of the backupstorage.BackupHandle interface. diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go new file mode 100644 index 00000000000..25d958934f7 --- /dev/null +++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go @@ -0,0 +1,43 @@ +package s3backupstorage + +import ( + "errors" + "net/http" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type s3ErrorClient struct{ s3iface.S3API } + +func (s3errclient *s3ErrorClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) { + req := request.Request{ + HTTPRequest: &http.Request{}, // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13) + Error: errors.New("some error"), // this forces req.Send() (which is called by the uploader) to always return non-nil error + } + + return &req, &s3.PutObjectOutput{} +} + +func TestAddFileError(t *testing.T) { + bh := &S3BackupHandle{client: &s3ErrorClient{}, readOnly: false} + + wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) + assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") + + n, err := wc.Write([]byte("here are some bytes")) + require.NoErrorf(t, err, "TestAddFile() could not write to uploader, got %d bytes written, err %s", n, err) + + err = wc.Close() + require.NoErrorf(t, err, "TestAddFile() could not close writer, got %s", err) + + bh.waitGroup.Wait() // wait for the goroutine to finish, at which point it should have recorded an error + + require.Equal(t, bh.HasErrors(), true, "AddFile() expected bh to record async error but did not") +}