From 0bc6666a0b509ff85fe20b38d0ccf7649262f396 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 10 Sep 2019 00:11:39 -0700 Subject: [PATCH 1/2] Update to latest GCS client. Signed-off-by: Anthony Yeh --- go.mod | 6 +++--- go.sum | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c4ba3b740f7..65d441910d2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module vitess.io/vitess go 1.12 require ( - cloud.google.com/go v0.43.0 + cloud.google.com/go v0.45.1 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v0.0.0-20180223184012-ebef4262e06a github.com/boltdb/bolt v1.3.1 // indirect @@ -22,6 +22,7 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/golang/snappy v0.0.0-20170215233205-553a64147049 + github.com/google/btree v1.0.0 // indirect github.com/gorilla/websocket v0.0.0-20160912153041-2d1e4548da23 github.com/grpc-ecosystem/go-grpc-middleware v0.0.0-20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180418170936-39de4380c2e0 @@ -67,8 +68,7 @@ require ( golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/tools v0.0.0-20190830154057-c17b040389b9 - google.golang.org/api v0.7.0 - google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect + google.golang.org/api v0.9.0 google.golang.org/grpc v1.21.1 gopkg.in/asn1-ber.v1 v1.0.0-20150924051756-4e86f4367175 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect diff --git a/go.sum b/go.sum index 37c48e4115c..18f93540941 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.43.0 h1:banaiRPAM8kUVYneOSkhgcDsLzEvL25FinuiSZaH/2w= cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg= +cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= +cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY= +cloud.google.com/go v0.45.1 h1:lRi0CHyU+ytlvylOlFKKq0af6JncuyoRh1J+QJBqQx0= +cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc= +cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= +cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -280,6 +286,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0 h1:9sdfJOzWlkqPltHAuzT2Cp+yrBeY1KRVYgms8soxMwM= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= +google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= +google.golang.org/api v0.9.0 h1:jbyannxz0XFD3zdjgrSUsaJbgpH4eTrkdhRChkHPfO8= +google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -293,6 +302,8 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190716160619-c506a9f90610/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 h1:iKtrH9Y8mcbADOP0YFaEMth7OfuHY9xHOwNj4znpM1A= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= From 3f9ff2bd367ba81d7d2ac3162ad30760c295c8a4 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 10 Sep 2019 00:36:24 -0700 Subject: [PATCH 2/2] xtrabackup: Make sure all files are closed before writing MANIFEST. We've observed a backup that was missing files, yet had a MANIFEST. In the built-in backup engine, the contract was that the MANIFEST file must not be written unless all files were confirmed to have been uploaded successfully. In XtraBackup mode, we were not meeting this contract because an error that occurred while closing a file would not be noticed until after we had written the MANIFEST. Signed-off-by: Anthony Yeh --- go/vt/mysqlctl/xtrabackupengine.go | 133 ++++++++++++++++------------- 1 file changed, 76 insertions(+), 57 deletions(-) diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 31158dba7d9..8ce8fb31dd4 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -106,6 +106,15 @@ func (be *XtrabackupEngine) backupFileName() string { return fileName } +func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalErr *error) { + if closeErr := wc.Close(); *finalErr == nil { + *finalErr = closeErr + } else if closeErr != nil { + // since we already have an error just log this + logger.Errorf("error closing file %v: %v", fileName, closeErr) + } +} + // ExecuteBackup returns a boolean that indicates if the backup is usable, // and an overall error. func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (complete bool, finalErr error) { @@ -128,6 +137,56 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql flavor := pos.GTIDSet.Flavor() logger.Infof("Detected MySQL flavor: %v", flavor) + backupFileName := be.backupFileName() + numStripes := int(*xtrabackupStripes) + + // Perform backups in a separate function, so deferred calls to Close() are + // all done before we continue to write the MANIFEST. This ensures that we + // do not write the MANIFEST unless all files were closed successfully, + // maintaining the contract that a MANIFEST file should only exist if the + // backup was created successfully. + replicationPosition, err := be.backupFiles(ctx, cnf, logger, bh, backupFileName, numStripes, flavor) + if err != nil { + return false, err + } + + // open the MANIFEST + mwc, err := bh.AddFile(ctx, backupManifestFileName, 0) + if err != nil { + return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) + } + defer closeFile(mwc, backupManifestFileName, logger, &finalErr) + + // JSON-encode and write the MANIFEST + bm := &xtraBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupMethod: xtrabackupEngineName, + Position: replicationPosition, + FinishedTime: time.Now().UTC().Format(time.RFC3339), + }, + + // XtraBackup-specific fields + FileName: backupFileName, + StreamMode: *xtrabackupStreamMode, + SkipCompress: !*backupStorageCompress, + Params: *xtrabackupBackupFlags, + NumStripes: int32(numStripes), + StripeBlockSize: int32(*xtrabackupStripeBlockSize), + } + + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) + } + if _, err := mwc.Write([]byte(data)); err != nil { + return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) + } + + return true, nil +} + +func (be *XtrabackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, logger logutil.Logger, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) { backupProgram := path.Join(*xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + cnf.path, @@ -140,40 +199,32 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql if *xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+*xtrabackupStreamMode) } - if *xtrabackupBackupFlags != "" { flagsToExec = append(flagsToExec, strings.Fields(*xtrabackupBackupFlags)...) } - backupFileName := be.backupFileName() - numStripes := int(*xtrabackupStripes) - destFiles, err := addStripeFiles(ctx, bh, backupFileName, numStripes, logger) if err != nil { - return false, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) - } - closeFile := func(wc io.WriteCloser, fileName string) { - if closeErr := wc.Close(); finalErr == nil { - finalErr = closeErr - } else if closeErr != nil { - // since we already have an error just log this - logger.Errorf("error closing file %v: %v", fileName, closeErr) - } + return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) } defer func() { - for _, file := range destFiles { - closeFile(file, backupFileName) + filename := backupFileName + for i, file := range destFiles { + if numStripes > 1 { + filename = stripeFileName(backupFileName, i) + } + closeFile(file, filename, logger, &finalErr) } }() backupCmd := exec.CommandContext(ctx, backupProgram, flagsToExec...) backupOut, err := backupCmd.StdoutPipe() if err != nil { - return false, vterrors.Wrap(err, "cannot create stdout pipe") + return replicationPosition, vterrors.Wrap(err, "cannot create stdout pipe") } backupErr, err := backupCmd.StderrPipe() if err != nil { - return false, vterrors.Wrap(err, "cannot create stderr pipe") + return replicationPosition, vterrors.Wrap(err, "cannot create stderr pipe") } destWriters := []io.Writer{} @@ -188,7 +239,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql if *backupStorageCompress { compressor, err := pgzip.NewWriterLevel(writer, pgzip.BestSpeed) if err != nil { - return false, vterrors.Wrap(err, "cannot create gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot create gzip compressor") } compressor.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) writer = compressor @@ -199,7 +250,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql } if err = backupCmd.Start(); err != nil { - return false, vterrors.Wrap(err, "unable to start backup") + return replicationPosition, vterrors.Wrap(err, "unable to start backup") } // Read stderr in the background, so we can log progress as xtrabackup runs. @@ -240,20 +291,20 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql blockSize = 1024 } if _, err := copyToStripes(destWriters, backupOut, blockSize); err != nil { - return false, vterrors.Wrap(err, "cannot copy output from xtrabackup command") + return replicationPosition, vterrors.Wrap(err, "cannot copy output from xtrabackup command") } // Close compressor to flush it. After that all data is sent to the buffer. for _, compressor := range destCompressors { if err := compressor.Close(); err != nil { - return false, vterrors.Wrap(err, "cannot close gzip compressor") + return replicationPosition, vterrors.Wrap(err, "cannot close gzip compressor") } } // Flush the buffer to finish writing on destination. for _, buffer := range destBuffers { if err = buffer.Flush(); err != nil { - return false, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName) + return replicationPosition, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName) } } @@ -263,47 +314,15 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysql sterrOutput := stderrBuilder.String() if err := backupCmd.Wait(); err != nil { - return false, vterrors.Wrap(err, "xtrabackup failed with error") + return replicationPosition, vterrors.Wrap(err, "xtrabackup failed with error") } replicationPosition, rerr := findReplicationPosition(sterrOutput, flavor, logger) if rerr != nil { - return false, vterrors.Wrap(rerr, "backup failed trying to find replication position") + return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } - // open the MANIFEST - mwc, err := bh.AddFile(ctx, backupManifestFileName, 0) - if err != nil { - return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) - } - defer closeFile(mwc, backupManifestFileName) - // JSON-encode and write the MANIFEST - bm := &xtraBackupManifest{ - // Common base fields - BackupManifest: BackupManifest{ - BackupMethod: xtrabackupEngineName, - Position: replicationPosition, - FinishedTime: time.Now().UTC().Format(time.RFC3339), - }, - - // XtraBackup-specific fields - FileName: backupFileName, - StreamMode: *xtrabackupStreamMode, - SkipCompress: !*backupStorageCompress, - Params: *xtrabackupBackupFlags, - NumStripes: int32(numStripes), - StripeBlockSize: int32(*xtrabackupStripeBlockSize), - } - - data, err := json.MarshalIndent(bm, "", " ") - if err != nil { - return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) - } - if _, err := mwc.Write([]byte(data)); err != nil { - return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) - } - - return true, nil + return replicationPosition, nil } // ExecuteRestore restores from a backup. Any error is returned.