diff --git a/go.mod b/go.mod index c4ba3b740f7..65d441910d2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module vitess.io/vitess go 1.12 require ( - cloud.google.com/go v0.43.0 + cloud.google.com/go v0.45.1 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v0.0.0-20180223184012-ebef4262e06a github.com/boltdb/bolt v1.3.1 // indirect @@ -22,6 +22,7 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/golang/snappy v0.0.0-20170215233205-553a64147049 + github.com/google/btree v1.0.0 // indirect github.com/gorilla/websocket v0.0.0-20160912153041-2d1e4548da23 github.com/grpc-ecosystem/go-grpc-middleware v0.0.0-20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180418170936-39de4380c2e0 @@ -67,8 +68,7 @@ require ( golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/tools v0.0.0-20190830154057-c17b040389b9 - google.golang.org/api v0.7.0 - google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect + google.golang.org/api v0.9.0 google.golang.org/grpc v1.21.1 gopkg.in/asn1-ber.v1 v1.0.0-20150924051756-4e86f4367175 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index 37c48e4115c..18f93540941 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.43.0 h1:banaiRPAM8kUVYneOSkhgcDsLzEvL25FinuiSZaH/2w= cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg= +cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= +cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY= +cloud.google.com/go v0.45.1 h1:lRi0CHyU+ytlvylOlFKKq0af6JncuyoRh1J+QJBqQx0= +cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc= +cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= +cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -280,6 +286,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0 h1:9sdfJOzWlkqPltHAuzT2Cp+yrBeY1KRVYgms8soxMwM= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= +google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= +google.golang.org/api v0.9.0 h1:jbyannxz0XFD3zdjgrSUsaJbgpH4eTrkdhRChkHPfO8= +google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -293,6 +302,8 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190716160619-c506a9f90610/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 h1:iKtrH9Y8mcbADOP0YFaEMth7OfuHY9xHOwNj4znpM1A= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 31158dba7d9..8ce8fb31dd4 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -106,6 +106,15 @@ func (be *XtrabackupEngine) backupFileName() string { return fileName } +func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalErr *error) { + if closeErr := wc.Close(); *finalErr == nil { + *finalErr = closeErr + } else if closeErr != nil { + // since we already have an error just log this + logger.Errorf("error closing file %v: %v", fileName, closeErr) + } +} + // ExecuteBackup returns a boolean that indicates if the backup is usable, // and an overall error. func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (complete bool, finalErr error) { @@ -128,6 +137,56 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql flavor := pos.GTIDSet.Flavor() logger.Infof("Detected MySQL flavor: %v", flavor) + backupFileName := be.backupFileName() + numStripes := int(*xtrabackupStripes) + + // Perform backups in a separate function, so deferred calls to Close() are + // all done before we continue to write the MANIFEST. This ensures that we + // do not write the MANIFEST unless all files were closed successfully, + // maintaining the contract that a MANIFEST file should only exist if the + // backup was created successfully. + replicationPosition, err := be.backupFiles(ctx, cnf, logger, bh, backupFileName, numStripes, flavor) + if err != nil { + return false, err + } + + // open the MANIFEST + mwc, err := bh.AddFile(ctx, backupManifestFileName, 0) + if err != nil { + return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) + } + defer closeFile(mwc, backupManifestFileName, logger, &finalErr) + + // JSON-encode and write the MANIFEST + bm := &xtraBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupMethod: xtrabackupEngineName, + Position: replicationPosition, + FinishedTime: time.Now().UTC().Format(time.RFC3339), + }, + + // XtraBackup-specific fields + FileName: backupFileName, + StreamMode: *xtrabackupStreamMode, + SkipCompress: !*backupStorageCompress, + Params: *xtrabackupBackupFlags, + NumStripes: int32(numStripes), + StripeBlockSize: int32(*xtrabackupStripeBlockSize), + } + + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) + } + if _, err := mwc.Write([]byte(data)); err != nil { + return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) + } + + return true, nil +} + +func (be *XtrabackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, logger logutil.Logger, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) { backupProgram := path.Join(*xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + cnf.path, @@ -140,40 +199,32 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql if *xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+*xtrabackupStreamMode) } - if *xtrabackupBackupFlags != "" { flagsToExec = append(flagsToExec, strings.Fields(*xtrabackupBackupFlags)...) } - backupFileName := be.backupFileName() - numStripes := int(*xtrabackupStripes) - destFiles, err := addStripeFiles(ctx, bh, backupFileName, numStripes, logger) if err != nil { - return false, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) - } - closeFile := func(wc io.WriteCloser, fileName string) { - if closeErr := wc.Close(); finalErr == nil { - finalErr = closeErr - } else if closeErr != nil { - // since we already have an error just log this - logger.Errorf("error closing file %v: %v", fileName, closeErr) - } + return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) } defer func() { - for _, file := range destFiles { - closeFile(file, backupFileName) + filename := backupFileName + for i, file := range destFiles { + if numStripes > 1 { + filename = stripeFileName(backupFileName, i) + } + closeFile(file, filename, logger, &finalErr) } }() backupCmd := exec.CommandContext(ctx, backupProgram, flagsToExec...) backupOut, err := backupCmd.StdoutPipe() if err != nil { - return false, vterrors.Wrap(err, "cannot create stdout pipe") + return replicationPosition, vterrors.Wrap(err, "cannot create stdout pipe") } backupErr, err := backupCmd.StderrPipe() if err != nil { - return false, vterrors.Wrap(err, "cannot create stderr pipe") + return replicationPosition, vterrors.Wrap(err, "cannot create stderr pipe") } destWriters := []io.Writer{} @@ -188,7 +239,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql if *backupStorageCompress { compressor, err := pgzip.NewWriterLevel(writer, pgzip.BestSpeed) if err != nil { - return false, vterrors.Wrap(err, "cannot create gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot create gzip compressor") } compressor.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) writer = compressor @@ -199,7 +250,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql } if err = backupCmd.Start(); err != nil { - return false, vterrors.Wrap(err, "unable to start backup") + return replicationPosition, vterrors.Wrap(err, "unable to start backup") } // Read stderr in the background, so we can log progress as xtrabackup runs. @@ -240,20 +291,20 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql blockSize = 1024 } if _, err := copyToStripes(destWriters, backupOut, blockSize); err != nil { - return false, vterrors.Wrap(err, "cannot copy output from xtrabackup command") + return replicationPosition, vterrors.Wrap(err, "cannot copy output from xtrabackup command") } // Close compressor to flush it. After that all data is sent to the buffer. for _, compressor := range destCompressors { if err := compressor.Close(); err != nil { - return false, vterrors.Wrap(err, "cannot close gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor") } } // Flush the buffer to finish writing on destination. for _, buffer := range destBuffers { if err = buffer.Flush(); err != nil { - return false, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName) + return replicationPosition, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName) } } @@ -263,47 +314,15 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql sterrOutput := stderrBuilder.String() if err := backupCmd.Wait(); err != nil { - return false, vterrors.Wrap(err, "xtrabackup failed with error") + return replicationPosition, vterrors.Wrap(err, "xtrabackup failed with error") } replicationPosition, rerr := findReplicationPosition(sterrOutput, flavor, logger) if rerr != nil { - return false, vterrors.Wrap(rerr, "backup failed trying to find replication position") + return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } - // open the MANIFEST - mwc, err := bh.AddFile(ctx, backupManifestFileName, 0) - if err != nil { - return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) - } - defer closeFile(mwc, backupManifestFileName) - // JSON-encode and write the MANIFEST - bm := &xtraBackupManifest{ - // Common base fields - BackupManifest: BackupManifest{ - BackupMethod: xtrabackupEngineName, - Position: replicationPosition, - FinishedTime: time.Now().UTC().Format(time.RFC3339), - }, - - // XtraBackup-specific fields - FileName: backupFileName, - StreamMode: *xtrabackupStreamMode, - SkipCompress: !*backupStorageCompress, - Params: *xtrabackupBackupFlags, - NumStripes: int32(numStripes), - StripeBlockSize: int32(*xtrabackupStripeBlockSize), - } - - data, err := json.MarshalIndent(bm, "", " ") - if err != nil { - return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) - } - if _, err := mwc.Write([]byte(data)); err != nil { - return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) - } - - return true, nil + return replicationPosition, nil } // ExecuteRestore restores from a backup. Any error is returned.