diff --git a/bootstrap.sh b/bootstrap.sh index 3c58a603efc..89fc619f4c1 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -331,7 +331,7 @@ if [ "$BUILD_TESTS" == 1 ] ; then echo "Found MySQL 5.6+ installation in $VT_MYSQL_ROOT." ;; - "MariaDB" | "MariaDB103" ) + "MariaDB" | "MariaDB103") myversion="$("$VT_MYSQL_ROOT/bin/mysql" --version)" [[ "$myversion" =~ MariaDB ]] || fail "Couldn't find MariaDB in $VT_MYSQL_ROOT. Set VT_MYSQL_ROOT to override search location." echo "Found MariaDB installation in $VT_MYSQL_ROOT." diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index 3b732fdc6df..0046bad32a2 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -18,7 +18,7 @@ innodb_flush_log_at_trx_commit = 2 innodb_flush_method = O_DIRECT innodb_lock_wait_timeout = 20 innodb_log_buffer_size = 1M -innodb_log_file_size = 1M +innodb_log_file_size = 4M innodb_log_files_in_group = 2 innodb_log_group_home_dir = {{.InnodbLogGroupHomeDir}} innodb_max_dirty_pages_pct = 75 diff --git a/docker/bootstrap/Dockerfile.mariadb b/docker/bootstrap/Dockerfile.mariadb index 83dfdff8b74..81ee6bb2678 100644 --- a/docker/bootstrap/Dockerfile.mariadb +++ b/docker/bootstrap/Dockerfile.mariadb @@ -1,12 +1,18 @@ FROM vitess/bootstrap:common # Install MariaDB 10 -RUN apt-get update \ +RUN apt-get update -y \ && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ bzip2 \ mariadb-server \ libmariadbclient-dev \ - && rm -rf /var/lib/apt/lists/* + libdbd-mysql-perl \ + rsync \ + libev4 \ + && rm -rf /var/lib/apt/lists/* \ + && wget https://www.percona.com/downloads/XtraBackup/Percona-XtraBackup-2.4.13/binary/debian/stretch/x86_64/percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb \ + && dpkg -i percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb \ + && rm -f percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb # Bootstrap Vitess WORKDIR /vt/src/vitess.io/vitess diff --git a/docker/bootstrap/Dockerfile.mysql56 b/docker/bootstrap/Dockerfile.mysql56 index 56fd5dba41b..5da4ec29946 100644 --- a/docker/bootstrap/Dockerfile.mysql56 +++ b/docker/bootstrap/Dockerfile.mysql56 @@ -3,9 +3,12 @@ FROM vitess/bootstrap:common # Install MySQL 5.6 RUN for i in $(seq 1 10); do apt-key adv --no-tty --recv-keys --keyserver pool.sks-keyservers.net 5072E1F5 && break; done && \ add-apt-repository 'deb http://repo.mysql.com/apt/debian/ stretch mysql-5.6' && \ - apt-get update && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev && \ - rm -rf /var/lib/apt/lists/* + apt-get update -y && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev libdbd-mysql-perl rsync libev4 && \ + rm -rf /var/lib/apt/lists/* && \ + wget https://www.percona.com/downloads/XtraBackup/Percona-XtraBackup-2.4.13/binary/debian/stretch/x86_64/percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb && \ + dpkg -i percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb && \ + rm -f percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb # Bootstrap Vitess WORKDIR /vt/src/vitess.io/vitess diff --git a/docker/bootstrap/Dockerfile.mysql57 b/docker/bootstrap/Dockerfile.mysql57 index 75d7d03aa17..a251e03031d 100644 --- a/docker/bootstrap/Dockerfile.mysql57 +++ b/docker/bootstrap/Dockerfile.mysql57 @@ -4,9 +4,12 @@ FROM vitess/bootstrap:common RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends gnupg dirmngr ca-certificates && \ for i in $(seq 1 10); do apt-key adv --no-tty --recv-keys --keyserver ha.pool.sks-keyservers.net 5072E1F5 && break; done && \ add-apt-repository 'deb http://repo.mysql.com/apt/debian/ stretch mysql-5.7' && \ - apt-get update && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev && \ - rm -rf /var/lib/apt/lists/* + apt-get update -y && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev libdbd-mysql-perl rsync libev4 && \ + rm -rf /var/lib/apt/lists/* && \ + wget https://www.percona.com/downloads/XtraBackup/Percona-XtraBackup-2.4.13/binary/debian/stretch/x86_64/percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb && \ + dpkg -i percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb && \ + rm -f percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb # Bootstrap Vitess WORKDIR /vt/src/vitess.io/vitess diff --git a/docker/bootstrap/Dockerfile.mysql80 b/docker/bootstrap/Dockerfile.mysql80 index c82ee741dec..3ef12b7e056 100644 --- a/docker/bootstrap/Dockerfile.mysql80 +++ b/docker/bootstrap/Dockerfile.mysql80 @@ -3,9 +3,12 @@ FROM vitess/bootstrap:common # Install MySQL 5.7 RUN for i in $(seq 1 10); do apt-key adv --no-tty --recv-keys --keyserver ha.pool.sks-keyservers.net 8C718D3B5072E1F5 && break; done && \ add-apt-repository 'deb http://repo.mysql.com/apt/debian/ stretch mysql-8.0' && \ - apt-get update && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev && \ - rm -rf /var/lib/apt/lists/* + apt-get update -y && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y mysql-server libmysqlclient-dev libdbd-mysql-perl rsync libev4 && \ + rm -rf /var/lib/apt/lists/* && \ + wget https://www.percona.com/downloads/XtraBackup/Percona-XtraBackup-8.0.4/binary/debian/stretch/x86_64/percona-xtrabackup-80_8.0.4-1.stretch_amd64.deb && \ + dpkg -i percona-xtrabackup-80_8.0.4-1.stretch_amd64.deb && \ + rm -f percona-xtrabackup-80_8.0.4-1.stretch_amd64.deb # Bootstrap Vitess WORKDIR /vt/src/vitess.io/vitess diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 2248cfb4bf2..e0b9206d17e 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -19,7 +19,6 @@ package mysqlctl import ( "errors" "flag" - "fmt" "os" "path/filepath" "strings" @@ -31,6 +30,8 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) // This file handles the backup and restore related code @@ -86,17 +87,17 @@ func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil. // Start the backup with the BackupStorage. bs, err := backupstorage.GetBackupStorage() if err != nil { - return err + return vterrors.Wrap(err, "unable to get backup storage") } defer bs.Close() bh, err := bs.StartBackup(ctx, dir, name) if err != nil { - return fmt.Errorf("StartBackup failed: %v", err) + return vterrors.Wrap(err, "StartBackup failed") } be, err := GetBackupEngine() if err != nil { - return fmt.Errorf("Failed to find backup engine: %v", err) + return vterrors.Wrap(err, "failed to find backup engine") } // Take the backup, and either AbortBackup or EndBackup. @@ -132,7 +133,7 @@ func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil. func checkNoDB(ctx context.Context, mysqld MysqlDaemon, dbName string) (bool, error) { qr, err := mysqld.FetchSuperQuery(ctx, "SHOW DATABASES") if err != nil { - return false, fmt.Errorf("checkNoDB failed: %v", err) + return false, vterrors.Wrap(err, "checkNoDB failed") } backtickDBName := sqlescape.EscapeID(dbName) @@ -140,7 +141,7 @@ func checkNoDB(ctx context.Context, mysqld MysqlDaemon, dbName string) (bool, er if row[0].ToString() == dbName { tableQr, err := mysqld.FetchSuperQuery(ctx, "SHOW TABLES FROM "+backtickDBName) if err != nil { - return false, fmt.Errorf("checkNoDB failed: %v", err) + return false, vterrors.Wrap(err, "checkNoDB failed") } if len(tableQr.Rows) == 0 { // no tables == empty db, all is well @@ -171,7 +172,7 @@ func removeExistingFiles(cnf *Mycnf) error { } for name, path := range paths { if path == "" { - return fmt.Errorf("can't remove existing files: %v is unknown", name) + return vterrors.Errorf(vtrpc.Code_UNKNOWN, "can't remove existing files: %v is unknown", name) } if strings.HasSuffix(name, ".*") { @@ -181,11 +182,11 @@ func removeExistingFiles(cnf *Mycnf) error { log.Infof("Restore: removing files in %v (%v)", name, path) matches, err := filepath.Glob(path) if err != nil { - return fmt.Errorf("can't expand path glob %q: %v", path, err) + return vterrors.Wrapf(err, "can't expand path glob %q", path) } for _, match := range matches { if err := os.Remove(match); err != nil { - return fmt.Errorf("can't remove existing file from %v (%v): %v", name, match, err) + return vterrors.Wrapf(err, "can't remove existing file from %v (%v)", name, match) } } continue @@ -198,7 +199,7 @@ func removeExistingFiles(cnf *Mycnf) error { } log.Infof("Restore: removing files in %v (%v)", name, path) if err := os.RemoveAll(path); err != nil { - return fmt.Errorf("can't remove existing files in %v (%v): %v", name, path, err) + return vterrors.Wrapf(err, "can't remove existing files in %v (%v)", name, path) } } return nil @@ -251,15 +252,15 @@ func Restore( bhs, err := bs.ListBackups(ctx, dir) if err != nil { - return mysql.Position{}, fmt.Errorf("ListBackups failed: %v", err) + return mysql.Position{}, vterrors.Wrap(err, "ListBackups failed") } if len(bhs) == 0 { // There are no backups (not even broken/incomplete ones). - logger.Errorf("No backup to restore on BackupStorage for directory %v. Starting up empty.", dir) + logger.Errorf("no backup to restore on BackupStorage for directory %v. Starting up empty.", dir) // Since this is an empty database make sure we start replication at the beginning if err = mysqld.ResetReplication(ctx); err == nil { - logger.Errorf("Error reseting slave replication: %v. Continuing", err) + logger.Errorf("error reseting slave replication: %v. Continuing", err) err = ErrNoBackup } @@ -271,7 +272,7 @@ func Restore( be, err := GetBackupEngine() if err != nil { - return mysql.Position{}, fmt.Errorf("Failed to find backup engine: %v", err) + return mysql.Position{}, vterrors.Wrap(err, "Failed to find backup engine") } if rval, err = be.ExecuteRestore(ctx, cnf, mysqld, logger, dir, bhs, restoreConcurrency, hookExtraEnv); err != nil { return rval, err @@ -293,7 +294,7 @@ func Restore( logger.Infof("Restore: running mysql_upgrade") if err := mysqld.RunMysqlUpgrade(); err != nil { - return mysql.Position{}, fmt.Errorf("mysql_upgrade failed: %v", err) + return mysql.Position{}, vterrors.Wrap(err, "mysql_upgrade failed") } // Populate local_metadata before starting without --skip-networking, diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index c793a0493bb..725ea894ae3 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -19,17 +19,17 @@ package mysqlctl import ( "context" "flag" - "fmt" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) var ( - // BackupEngineImplementation is the implementation to use - // for BackupEngine. Exported for test purposes. - BackupEngineImplementation = flag.String("backup_engine_implementation", "builtin", "which implementation to use for the backup storage engine") + // BackupEngineImplementation is the implementation to use for BackupEngine + backupEngineImplementation = flag.String("backup_engine_implementation", builtin, "which implementation to use for the backup method, builtin or xtrabackup") ) // BackupEngine is the interface to the backup engine @@ -44,9 +44,9 @@ var BackupEngineMap = make(map[string]BackupEngine) // GetBackupEngine returns the current BackupEngine implementation. // Should be called after flags have been initialized. func GetBackupEngine() (BackupEngine, error) { - be, ok := BackupEngineMap[*BackupEngineImplementation] + be, ok := BackupEngineMap[*backupEngineImplementation] if !ok { - return nil, fmt.Errorf("no registered implementation of BackupEngine") + return nil, vterrors.New(vtrpc.Code_NOT_FOUND, "no registered implementation of BackupEngine") } return be, nil } diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 38a23b85d4f..0d79476aea2 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -37,6 +37,13 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +const ( + builtin = "builtin" + writerBufferSize = 2 * 1024 * 1024 ) // BuiltinBackupEngine encapsulates the logic of the builtin engine @@ -46,10 +53,10 @@ import ( type BuiltinBackupEngine struct { } -// BackupManifest represents the backup. It lists all the files, the +// builtinBackupManifest represents the backup. It lists all the files, the // Position that the backup was taken at, and the transform hook used, // if any. -type BackupManifest struct { +type builtinBackupManifest struct { // FileEntries contains all the files in the backup FileEntries []FileEntry @@ -93,7 +100,7 @@ func (fe *FileEntry) open(cnf *Mycnf, readOnly bool) (*os.File, error) { case backupData: root = cnf.DataDir default: - return nil, fmt.Errorf("unknown base: %v", fe.Base) + return nil, vterrors.Errorf(vtrpc.Code_UNKNOWN, "unknown base: %v", fe.Base) } // and open the file @@ -102,15 +109,15 @@ func (fe *FileEntry) open(cnf *Mycnf, readOnly bool) (*os.File, error) { var err error if readOnly { if fd, err = os.Open(name); err != nil { - return nil, fmt.Errorf("cannot open source file %v: %v", name, err) + return nil, vterrors.Wrapf(err, "cannot open source file %v", name) } } else { dir := path.Dir(name) if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return nil, fmt.Errorf("cannot create destination directory %v: %v", dir, err) + return nil, vterrors.Wrapf(err, "cannot create destination directory %v", dir) } if fd, err = os.Create(name); err != nil { - return nil, fmt.Errorf("cannot create destination file %v: %v", name, err) + return nil, vterrors.Wrapf(err, "cannot create destination file %v", name) } } return fd, nil @@ -250,13 +257,13 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my // keep going if we're the master, might be a degenerate case sourceIsMaster = true default: - return false, fmt.Errorf("can't get slave status: %v", err) + return false, vterrors.Wrap(err, "can't get slave status") } // get the read-only flag readOnly, err = mysqld.IsReadOnly() if err != nil { - return false, fmt.Errorf("can't get read-only status: %v", err) + return false, vterrors.Wrap(err, "can't get read-only status") } // get the replication position @@ -264,21 +271,21 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my if !readOnly { logger.Infof("turning master read-only before backup") if err = mysqld.SetReadOnly(true); err != nil { - return false, fmt.Errorf("can't set read-only status: %v", err) + return false, vterrors.Wrap(err, "can't set read-only status") } } replicationPosition, err = mysqld.MasterPosition() if err != nil { - return false, fmt.Errorf("can't get master position: %v", err) + return false, vterrors.Wrap(err, "can't get master position") } } else { if err = mysqld.StopSlave(hookExtraEnv); err != nil { - return false, fmt.Errorf("can't stop slave: %v", err) + return false, vterrors.Wrapf(err, "can't stop slave") } var slaveStatus mysql.SlaveStatus slaveStatus, err = mysqld.SlaveStatus() if err != nil { - return false, fmt.Errorf("can't get slave status: %v", err) + return false, vterrors.Wrap(err, "can't get slave status") } replicationPosition = slaveStatus.Position } @@ -287,7 +294,7 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my // shutdown mysqld err = mysqld.Shutdown(ctx, cnf, true) if err != nil { - return false, fmt.Errorf("can't shutdown mysqld: %v", err) + return false, vterrors.Wrap(err, "can't shutdown mysqld") } // Backup everything, capture the error. @@ -297,7 +304,7 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my // Try to restart mysqld err = mysqld.Start(ctx, cnf) if err != nil { - return usable, fmt.Errorf("can't restart mysqld: %v", err) + return usable, vterrors.Wrap(err, "can't restart mysqld") } // Restore original mysqld state that we saved above. @@ -314,12 +321,12 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my if slaveStartRequired { logger.Infof("restarting mysql replication") if err := mysqld.StartSlave(hookExtraEnv); err != nil { - return usable, fmt.Errorf("cannot restart slave: %v", err) + return usable, vterrors.Wrap(err, "cannot restart slave") } // this should be quick, but we might as well just wait if err := WaitForSlaveStart(mysqld, slaveStartDeadline); err != nil { - return usable, fmt.Errorf("slave is not restarting: %v", err) + return usable, vterrors.Wrap(err, "slave is not restarting") } } @@ -337,7 +344,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, mysq // Get the files to backup. fes, err := findFilesToBackup(cnf) if err != nil { - return fmt.Errorf("can't find files to backup: %v", err) + return vterrors.Wrap(err, "can't find files to backup") } logger.Infof("found %v files to backup", len(fes)) @@ -372,7 +379,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, mysq // open the MANIFEST wc, err := bh.AddFile(ctx, backupManifest, 0) if err != nil { - return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err) + return vterrors.Wrapf(err, "cannot add %v to backup", backupManifest) } defer func() { if closeErr := wc.Close(); err == nil { @@ -381,7 +388,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, mysq }() // JSON-encode and write the MANIFEST - bm := &BackupManifest{ + bm := &builtinBackupManifest{ FileEntries: fes, Position: replicationPosition, TransformHook: *backupStorageHook, @@ -389,10 +396,10 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, mysq } data, err := json.MarshalIndent(bm, "", " ") if err != nil { - return fmt.Errorf("cannot JSON encode %v: %v", backupManifest, err) + return vterrors.Wrapf(err, "cannot JSON encode %v", backupManifest) } if _, err := wc.Write([]byte(data)); err != nil { - return fmt.Errorf("cannot write %v: %v", backupManifest, err) + return vterrors.Wrapf(err, "cannot write %v", backupManifest) } return nil @@ -416,7 +423,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql // Open the destination file for writing, and a buffer. wc, err := bh.AddFile(ctx, name, fi.Size()) if err != nil { - return fmt.Errorf("cannot add file: %v", err) + return vterrors.Wrapf(err, "cannot add file: %v", name) } defer func() { if rerr := wc.Close(); rerr != nil { @@ -428,7 +435,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql } } }() - dst := bufio.NewWriterSize(wc, 2*1024*1024) + dst := bufio.NewWriterSize(wc, writerBufferSize) // Create the hasher and the tee on top. hasher := newHasher() @@ -442,7 +449,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql h.ExtraEnv = hookExtraEnv pipe, wait, _, err = h.ExecuteAsWritePipe(writer) if err != nil { - return fmt.Errorf("'%v' hook returned error: %v", *backupStorageHook, err) + return vterrors.Wrapf(err, "'%v' hook returned error", *backupStorageHook) } writer = pipe } @@ -452,7 +459,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql if *backupStorageCompress { gzip, err = pgzip.NewWriterLevel(writer, pgzip.BestSpeed) if err != nil { - return fmt.Errorf("cannot create gziper: %v", err) + return vterrors.Wrap(err, "cannot create gziper") } gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) writer = gzip @@ -462,33 +469,33 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql // optional pipe, tee, output file and hasher). _, err = io.Copy(writer, source) if err != nil { - return fmt.Errorf("cannot copy data: %v", err) + return vterrors.Wrap(err, "cannot copy data") } // Close gzip to flush it, after that all data is sent to writer. if gzip != nil { if err = gzip.Close(); err != nil { - return fmt.Errorf("cannot close gzip: %v", err) + return vterrors.Wrap(err, "cannot close gzip") } } // Close the hook pipe if necessary. if pipe != nil { if err := pipe.Close(); err != nil { - return fmt.Errorf("cannot close hook pipe: %v", err) + return vterrors.Wrap(err, "cannot close hook pipe") } stderr, err := wait() if stderr != "" { logger.Infof("'%v' hook returned stderr: %v", *backupStorageHook, stderr) } if err != nil { - return fmt.Errorf("'%v' returned error: %v", *backupStorageHook, err) + return vterrors.Wrapf(err, "'%v' returned error", *backupStorageHook) } } // Flush the buffer to finish writing on destination. if err = dst.Flush(); err != nil { - return fmt.Errorf("cannot flush dst: %v", err) + return vterrors.Wrapf(err, "cannot flush destination: %v", name) } // Save the hash. @@ -510,7 +517,7 @@ func (be *BuiltinBackupEngine) ExecuteRestore( hookExtraEnv map[string]string) (mysql.Position, error) { var bh backupstorage.BackupHandle - var bm BackupManifest + var bm builtinBackupManifest var toRestore int for toRestore = len(bhs) - 1; toRestore >= 0; toRestore-- { @@ -637,7 +644,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, cnf *Mycnf, bh b h.ExtraEnv = hookExtraEnv reader, wait, _, err = h.ExecuteAsReadPipe(reader) if err != nil { - return fmt.Errorf("'%v' hook returned error: %v", transformHook, err) + return vterrors.Wrapf(err, "'%v' hook returned error", transformHook) } } @@ -672,14 +679,14 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, cnf *Mycnf, bh b log.Infof("'%v' hook returned stderr: %v", transformHook, stderr) } if err != nil { - return fmt.Errorf("'%v' returned error: %v", transformHook, err) + return vterrors.Wrapf(err, "'%v' returned error", transformHook) } } // Check the hash. hash := hasher.HashString() if hash != fe.Hash { - return fmt.Errorf("hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) + return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) } // Flush the buffer. diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 8d926fb9e66..23b4c55efa5 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -224,6 +224,7 @@ func (mysqld *Mysqld) SetSlavePosition(ctx context.Context, pos mysql.Position) defer conn.Recycle() cmds := conn.SetSlavePositionCommands(pos) + log.Infof("Executing commands to set slave position: %v", cmds) return mysqld.executeSuperQueryListConn(ctx, conn, cmds) } diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go new file mode 100644 index 00000000000..557fec876f9 --- /dev/null +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -0,0 +1,490 @@ +/* +Copyright 2019 The Vitess Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "flag" + "io" + "io/ioutil" + "os/exec" + "path" + "strings" + + "github.com/klauspost/pgzip" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +// XtrabackupEngine encapsulates the logic of the xtrabackup engine +// it implements the BackupEngine interface and contains all the logic +// required to implement a backup/restore by invoking xtrabackup with +// the appropriate parameters +type XtrabackupEngine struct { +} + +var ( + // path where backup engine program is located + xtrabackupEnginePath = flag.String("xtrabackup_root_path", "", "directory location of the xtrabackup executable, e.g., /usr/bin") + // flags to pass through to backup engine + xtrabackupBackupFlags = flag.String("xtrabackup_backup_flags", "", "flags to pass to backup command. These should be space separated and will be added to the end of the command") + // flags to pass through to restore phase + xbstreamRestoreFlags = flag.String("xbstream_restore_flags", "", "flags to pass to xbstream command during restore. These should be space separated and will be added to the end of the command. These need to match the ones used for backup e.g. --compress / --decompress, --encrypt / --decrypt") + // streaming mode + xtrabackupStreamMode = flag.String("xtrabackup_stream_mode", "tar", "which mode to use if streaming, valid values are tar and xbstream") + xtrabackupUser = flag.String("xtrabackup_user", "", "User that xtrabackup will use to connect to the database server. This user must have all necessary privileges. For details, please refer to xtrabackup documentation.") +) + +const ( + streamModeTar = "tar" + xtrabackupBinaryName = "xtrabackup" + xtrabackupBackupMethod = "xtrabackup" + xbstream = "xbstream" +) + +// xtraBackupManifest represents a backup. +// It stores the name of the backup file, the replication position, +// whether the backup is compressed using gzip, and any extra +// command line parameters used while invoking it. +type xtraBackupManifest struct { + // Name of the backup file + FileName string + // BackupMethod, set to xtrabackup + BackupMethod string + // Position at which the backup was taken + Position mysql.Position + // SkipCompress can be set if the backup files were not run + // through gzip. + SkipCompress bool + // Params are the parameters that backup was run with + Params string `json:"ExtraCommandLineParams"` +} + +func (be *XtrabackupEngine) backupFileName() string { + fileName := "backup" + if *xtrabackupStreamMode != "" { + fileName += "." + fileName += *xtrabackupStreamMode + } + if *backupStorageCompress { + fileName += ".gz" + } + return fileName +} + +// ExecuteBackup returns a boolean that indicates if the backup is usable, +// and an overall error. +func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (bool, error) { + + if *xtrabackupUser == "" { + return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.") + } + // use a mysql connection to detect flavor at runtime + conn, err := mysqld.GetDbaConnection() + defer conn.Close() + if err != nil { + return false, vterrors.Wrap(err, "unable to obtain a connection to the database") + } + pos, err := conn.MasterPosition() + if err != nil { + return false, vterrors.Wrap(err, "unable to obtain master position") + } + flavor := pos.GTIDSet.Flavor() + logger.Infof("Detected MySQL flavor: %v", flavor) + + backupProgram := path.Join(*xtrabackupEnginePath, xtrabackupBinaryName) + + flagsToExec := []string{"--defaults-file=" + cnf.path, + "--backup", + "--socket=" + cnf.SocketFile, + "--slave-info", + "--user=" + *xtrabackupUser, + "--target-dir=" + cnf.TmpDir, + } + if *xtrabackupStreamMode != "" { + flagsToExec = append(flagsToExec, "--stream="+*xtrabackupStreamMode) + } + + if *xtrabackupBackupFlags != "" { + flagsToExec = append(flagsToExec, strings.Fields(*xtrabackupBackupFlags)...) + } + + backupFileName := be.backupFileName() + + wc, err := bh.AddFile(ctx, backupFileName, 0) + if err != nil { + return false, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName) + } + closeFile := func(wc io.WriteCloser, fileName string) { + if closeErr := wc.Close(); err == nil { + err = closeErr + } else if closeErr != nil { + // since we already have an error just log this + logger.Errorf("Error closing file %v: %v", fileName, err) + } + } + defer closeFile(wc, backupFileName) + + backupCmd := exec.Command(backupProgram, flagsToExec...) + backupOut, _ := backupCmd.StdoutPipe() + backupErr, _ := backupCmd.StderrPipe() + dst := bufio.NewWriterSize(wc, writerBufferSize) + writer := io.MultiWriter(dst) + + // Create the gzip compression pipe, if necessary. + var gzip *pgzip.Writer + if *backupStorageCompress { + gzip, err = pgzip.NewWriterLevel(writer, pgzip.BestSpeed) + if err != nil { + return false, vterrors.Wrap(err, "cannot create gziper") + } + gzip.SetConcurrency(*backupCompressBlockSize, *backupCompressBlocks) + writer = gzip + } + + if err = backupCmd.Start(); err != nil { + return false, vterrors.Wrap(err, "unable to start backup") + } + + // Copy from the stream output to destination file (optional gzip) + _, err = io.Copy(writer, backupOut) + if err != nil { + return false, vterrors.Wrap(err, "cannot copy output from xtrabackup command") + } + + // Close gzip to flush it, after that all data is sent to writer. + if gzip != nil { + if err = gzip.Close(); err != nil { + return false, vterrors.Wrap(err, "cannot close gzip") + } + } + + // Flush the buffer to finish writing on destination. + if err = dst.Flush(); err != nil { + return false, vterrors.Wrapf(err, "cannot flush destination: %v", backupFileName) + } + + stderrOutput, err := ioutil.ReadAll(backupErr) + if err != nil { + return false, vterrors.Wrap(err, "backup failed while reading command output") + } + execErr := backupCmd.Wait() + if execErr != nil { + return false, vterrors.Wrap(err, "xtrabackup failed with error") + } + output := string(stderrOutput) + logger.Infof("Xtrabackup backup command output: %v", output) + + replicationPosition, rerr := findReplicationPosition(output, flavor, logger) + if rerr != nil { + return false, vterrors.Wrap(rerr, "backup failed trying to find replication position") + } + // open the MANIFEST + mwc, err := bh.AddFile(ctx, backupManifest, 0) + if err != nil { + return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifest) + } + defer closeFile(mwc, backupManifest) + + // JSON-encode and write the MANIFEST + bm := &xtraBackupManifest{ + FileName: backupFileName, + BackupMethod: xtrabackupBackupMethod, + Position: replicationPosition, + SkipCompress: !*backupStorageCompress, + Params: *xtrabackupBackupFlags, + } + + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifest) + } + if _, err := mwc.Write([]byte(data)); err != nil { + return false, vterrors.Wrapf(err, "cannot write %v", backupManifest) + } + + return true, nil +} + +// ExecuteRestore restores from a backup. Any error is returned. +func (be *XtrabackupEngine) ExecuteRestore( + ctx context.Context, + cnf *Mycnf, + mysqld MysqlDaemon, + logger logutil.Logger, + dir string, + bhs []backupstorage.BackupHandle, + restoreConcurrency int, + hookExtraEnv map[string]string) (mysql.Position, error) { + + var bh backupstorage.BackupHandle + var bm xtraBackupManifest + var index int + zeroPosition := mysql.Position{} + + for index = len(bhs) - 1; index >= 0; index-- { + bh = bhs[index] + rc, err := bh.ReadFile(ctx, backupManifest) + if err != nil { + log.Warningf("Possibly incomplete backup %v in directory %v on BackupStorage: can't read MANIFEST: %v)", bh.Name(), dir, err) + continue + } + + err = json.NewDecoder(rc).Decode(&bm) + rc.Close() + if err != nil { + log.Warningf("Possibly incomplete backup %v in directory %v on BackupStorage (cannot JSON decode MANIFEST: %v)", bh.Name(), dir, err) + continue + } + + logger.Infof("Restore: found backup %v %v to restore with %v file", bh.Directory(), bh.Name(), bm.FileName) + break + } + if index < 0 { + // There is at least one attempted backup, but none could be read. + // This implies there is data we ought to have, so it's not safe to start + // up empty. + return zeroPosition, errors.New("backup(s) found but none could be read, unsafe to start up empty, restart to retry restore") + } + + // Starting from here we won't be able to recover if we get stopped by a cancelled + // context. Thus we use the background context to get through to the finish. + + logger.Infof("Restore: shutdown mysqld") + err := mysqld.Shutdown(context.Background(), cnf, true) + if err != nil { + return zeroPosition, err + } + + logger.Infof("Restore: deleting existing files") + if err := removeExistingFiles(cnf); err != nil { + return zeroPosition, err + } + + logger.Infof("Restore: reinit config file") + err = mysqld.ReinitConfig(context.Background(), cnf) + if err != nil { + return zeroPosition, err + } + + // copy / extract files + logger.Infof("Restore: Extracting all files") + + // first download the file into a tmp dir + // extract all the files + if err := be.restoreFile(ctx, cnf, logger, bh, !bm.SkipCompress, be.backupFileName()); err != nil { + logger.Errorf("error restoring backup file %v:%v", be.backupFileName(), err) + return zeroPosition, err + } + + // copy / extract files + logger.Infof("Restore: Preparing the files") + // prepare the backup + restoreProgram := path.Join(*xtrabackupEnginePath, xtrabackupBinaryName) + flagsToExec := []string{"--defaults-file=" + cnf.path, + "--prepare", + "--target-dir=" + cnf.TmpDir, + } + prepareCmd := exec.Command(restoreProgram, flagsToExec...) + prepareOut, _ := prepareCmd.StdoutPipe() + prepareErr, _ := prepareCmd.StderrPipe() + if err = prepareCmd.Start(); err != nil { + return zeroPosition, vterrors.Wrap(err, "unable to start prepare") + } + + errOutput, _ := ioutil.ReadAll(prepareErr) + stdOutput, _ := ioutil.ReadAll(prepareOut) + err = prepareCmd.Wait() + if string(stdOutput) != "" { + logger.Infof("Prepare stdout %v", string(stdOutput)) + } + output := string(errOutput) + if output != "" { + logger.Infof("Prepare stderr %v", output) + } + + if err != nil { + return zeroPosition, vterrors.Wrap(err, "prepare step failed") + } + + // then copy-back + logger.Infof("Restore: Copying the files") + + flagsToExec = []string{"--defaults-file=" + cnf.path, + "--copy-back", + "--target-dir=" + cnf.TmpDir, + } + copybackCmd := exec.Command(restoreProgram, flagsToExec...) + copybackErr, _ := copybackCmd.StderrPipe() + copybackOut, _ := copybackCmd.StdoutPipe() + + if err = copybackCmd.Start(); err != nil { + return zeroPosition, vterrors.Wrap(err, "unable to start copy-back") + } + + errOutput, _ = ioutil.ReadAll(copybackErr) + stdOutput, _ = ioutil.ReadAll(copybackOut) + err = copybackCmd.Wait() + output = string(errOutput) + if output != "" { + logger.Infof("Copy-back stderr %v", string(output)) + } + if string(stdOutput) != "" { + logger.Infof("Copy-back stdout %v", string(stdOutput)) + } + + if err != nil { + return zeroPosition, vterrors.Wrap(err, "copy-back step failed") + } + + // now find the slave position and return that + logger.Infof("Returning replication position %v", bm.Position) + return bm.Position, nil +} + +// restoreFile restores an individual file. +func (be *XtrabackupEngine) restoreFile( + ctx context.Context, + cnf *Mycnf, + logger logutil.Logger, + bh backupstorage.BackupHandle, + compress bool, + name string) (err error) { + + streamMode := *xtrabackupStreamMode + // Open the source file for reading. + var source io.ReadCloser + source, err = bh.ReadFile(ctx, name) + if err != nil { + return err + } + defer source.Close() + + reader := io.MultiReader(source) + + // Create the uncompresser if needed. + if compress { + gz, err := pgzip.NewReader(reader) + if err != nil { + return err + } + defer func() { + if cerr := gz.Close(); cerr != nil { + if err != nil { + // We already have an error, just log this one. + logger.Errorf("failed to close gunziper %v: %v", name, cerr) + } else { + err = cerr + } + } + }() + reader = gz + } + + switch streamMode { + case streamModeTar: + // now extract the files by running tar + // error if we can't find tar + flagsToExec := []string{"-C", cnf.TmpDir, "-xi"} + tarCmd := exec.Command("tar", flagsToExec...) + logger.Infof("Executing tar cmd with flags %v", flagsToExec) + tarCmd.Stdin = reader + tarOut, _ := tarCmd.StdoutPipe() + tarErr, _ := tarCmd.StderrPipe() + tarCmd.Start() + output, _ := ioutil.ReadAll(tarOut) + errOutput, _ := ioutil.ReadAll(tarErr) + err := tarCmd.Wait() + + if string(output) != "" { + logger.Infof("output from tar: %v ", string(output)) + } + if string(errOutput) != "" { + logger.Infof("error from tar: %v ", string(errOutput)) + } + if err != nil { + return vterrors.Wrap(err, "error from tar") + } + + case xbstream: + // now extract the files by running xbstream + xbstreamProgram := xbstream + flagsToExec := []string{} + if *xbstreamRestoreFlags != "" { + flagsToExec = append(flagsToExec, strings.Fields(*xbstreamRestoreFlags)...) + } + flagsToExec = append(flagsToExec, "-C", cnf.TmpDir, "-x") + xbstreamCmd := exec.Command(xbstreamProgram, flagsToExec...) + logger.Infof("Executing xbstream cmd: %v %v", xbstreamProgram, flagsToExec) + xbstreamCmd.Stdin = reader + xbstreamOut, _ := xbstreamCmd.StdoutPipe() + xbstreamErr, _ := xbstreamCmd.StderrPipe() + xbstreamCmd.Start() + output, _ := ioutil.ReadAll(xbstreamOut) + errOutput, _ := ioutil.ReadAll(xbstreamErr) + err := xbstreamCmd.Wait() + + if string(output) != "" { + logger.Infof("Output from xbstream: %v ", string(output)) + } + if string(errOutput) != "" { + logger.Infof("error from xbstream: %v", string(errOutput)) + } + if err != nil { + return vterrors.Wrap(err, "error from xbstream") + } + default: + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "%v is not a valid value for xtrabackup_stream_mode, supported modes are tar and xbstream", streamMode) + } + return nil +} + +func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) { + substrs := strings.Split(input, "'") + index := -1 + for i, str := range substrs { + if strings.Contains(str, "GTID of the last change") { + index = i + 1 + break + } + } + position := "" + if index != -1 { + // since we are extracting this from the log, it contains newlines + // replace them with a single space to match the SET GLOBAL gtid_purged command in xtrabackup_slave_info + position = strings.Replace(substrs[index], "\n", " ", -1) + } + logger.Infof("Found position: %v", position) + + // flavor is required to parse a string into a mysql.Position + replicationPosition, err := mysql.ParsePosition(flavor, position) + if err != nil { + return mysql.Position{}, err + } + return replicationPosition, nil +} + +func init() { + BackupEngineMap[xtrabackupBackupMethod] = &XtrabackupEngine{} +} diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index fd9b4480979..d24cb4c1ef7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -58,16 +59,22 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo } originalType := tablet.Type - // update our type to BACKUP - if _, err := topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topodatapb.TabletType_BACKUP); err != nil { - return err + engine, err := mysqlctl.GetBackupEngine() + if err != nil { + return vterrors.Wrap(err, "failed to find backup engine") } + builtin, _ := engine.(*mysqlctl.BuiltinBackupEngine) + if builtin != nil { + // update our type to BACKUP + if _, err := topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topodatapb.TabletType_BACKUP); err != nil { + return err + } - // let's update our internal state (stop query service and other things) - if err := agent.refreshTablet(ctx, "before backup"); err != nil { - return err + // let's update our internal state (stop query service and other things) + if err := agent.refreshTablet(ctx, "before backup"); err != nil { + return err + } } - // create the loggers: tee to console and source l := logutil.NewTeeLogger(logutil.NewConsoleLogger(), logger) @@ -76,22 +83,23 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo name := fmt.Sprintf("%v.%v", time.Now().UTC().Format("2006-01-02.150405"), topoproto.TabletAliasString(tablet.Alias)) returnErr := mysqlctl.Backup(ctx, agent.Cnf, agent.MysqlDaemon, l, dir, name, concurrency, agent.hookExtraEnv()) - // change our type back to the original value - _, err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, originalType) - if err != nil { - // failure in changing the topology type is probably worse, - // so returning that (we logged the snapshot error anyway) - if returnErr != nil { - l.Errorf("mysql backup command returned error: %v", returnErr) + if builtin != nil { + // change our type back to the original value + _, err = topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, originalType) + if err != nil { + // failure in changing the topology type is probably worse, + // so returning that (we logged the snapshot error anyway) + if returnErr != nil { + l.Errorf("mysql backup command returned error: %v", returnErr) + } + returnErr = err } - returnErr = err - } - // let's update our internal state (start query service and other things) - if err := agent.refreshTablet(ctx, "after backup"); err != nil { - return err + // let's update our internal state (start query service and other things) + if err := agent.refreshTablet(ctx, "after backup"); err != nil { + return err + } } - // and re-run health check to be sure to capture any replication delay agent.runHealthCheckLocked() diff --git a/test/backup.py b/test/backup.py index 99c3cc55cfa..47825c2ba9a 100755 --- a/test/backup.py +++ b/test/backup.py @@ -27,6 +27,15 @@ import utils use_mysqlctld = False +use_xtrabackup = False +stream_mode = 'tar' +xtrabackup_args = ['-backup_engine_implementation', + 'xtrabackup', + '-xtrabackup_stream_mode', + stream_mode, + '-xtrabackup_user=vt_dba', + '-xtrabackup_backup_flags', + '--password=VtDbaPass'] tablet_master = None tablet_replica1 = None @@ -152,14 +161,15 @@ def setUp(self): for t in tablet_master, tablet_replica1: t.create_db('vt_test_keyspace') + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) tablet_master.init_tablet('replica', 'test_keyspace', '0', start=True, supports_backups=True, - extra_args=['-db-credentials-file', - db_credentials_file]) + extra_args=xtra_args) tablet_replica1.init_tablet('replica', 'test_keyspace', '0', start=True, supports_backups=True, - extra_args=['-db-credentials-file', - db_credentials_file]) + extra_args=xtra_args) utils.run_vtctl(['InitShardMaster', '-force', 'test_keyspace/0', tablet_master.tablet_alias]) @@ -209,12 +219,17 @@ def _check_data(self, t, count, msg): def _restore(self, t, tablet_type='replica'): """Erase mysql/tablet dir, then start tablet with restore enabled.""" self._reset_tablet_dir(t) + + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) + t.start_vttablet(wait_for_state='SERVING', init_tablet_type=tablet_type, init_keyspace='test_keyspace', init_shard='0', supports_backups=True, - extra_args=['-db-credentials-file', db_credentials_file]) + extra_args=xtra_args) # check semi-sync is enabled for replica, disabled for rdonly. if tablet_type == 'replica': @@ -422,6 +437,24 @@ def test_master_slave_same_backup(self): self._check_data(tablet_replica1, 3, 'replica1 getting data from restored master') + # This is to test that replicationPosition is processed correctly + # while doing backup/restore after a reparent. + # It is written into the MANIFEST and read back from the MANIFEST. + + # Take another backup on the slave. + utils.run_vtctl(['Backup', tablet_replica1.tablet_alias], auto_log=True) + + # Insert more data on replica2 (current master). + self._insert_data(tablet_replica2, 4) + + # Force replica1 to restore from backup. + tablet_replica1.kill_vttablet() + self._restore(tablet_replica1) + + # Wait for replica1 to catch up. + self._check_data(tablet_replica1, 4, + 'replica1 getting data from master after reparent+backup+restore') + tablet_replica2.kill_vttablet() def _restore_old_master_test(self, restore_method): @@ -484,11 +517,20 @@ def _terminated_restore(t): logging.info('waiting for restore to finish') utils.wait_for_tablet_type(t.tablet_alias, 'replica', timeout=30) + # this test is run standalone with xtrabackup because it fails when run + # with the other master restore tests + if use_xtrabackup: + return + utils.Vtctld().start() self._restore_old_master_test(_terminated_restore) def test_backup_transform(self): """Use a transform, tests we backup and restore properly.""" + if use_xtrabackup: + # not supported + return + # Insert data on master, make sure slave gets it. tablet_master.mquery('vt_test_keyspace', self._create_vt_insert_test) self._insert_data(tablet_master, 1) @@ -496,13 +538,18 @@ def test_backup_transform(self): # Restart the replica with the transform parameter. tablet_replica1.kill_vttablet() + + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) + + hook_args = ['-backup_storage_hook', + 'test_backup_transform', + '-backup_storage_compress=false'] + xtra_args.extend(hook_args) + tablet_replica1.start_vttablet(supports_backups=True, - extra_args=[ - '-backup_storage_hook', - 'test_backup_transform', - '-backup_storage_compress=false', - '-db-credentials-file', - db_credentials_file]) + extra_args=xtra_args) # Take a backup, it should work. utils.run_vtctl(['Backup', tablet_replica1.tablet_alias], auto_log=True) @@ -537,13 +584,19 @@ def test_backup_transform(self): def test_backup_transform_error(self): """Use a transform, force an error, make sure the backup fails.""" + if use_xtrabackup: + # not supported + return + # Restart the replica with the transform parameter. tablet_replica1.kill_vttablet() + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) + hook_args = ['-backup_storage_hook','test_backup_error'] + xtra_args.extend(hook_args) tablet_replica1.start_vttablet(supports_backups=True, - extra_args=['-backup_storage_hook', - 'test_backup_error', - '-db-credentials-file', - db_credentials_file]) + extra_args=xtra_args) # This will fail, make sure we get the right error. _, err = utils.run_vtctl(['Backup', tablet_replica1.tablet_alias], diff --git a/test/config.json b/test/config.json index 87616c1491f..36c0cc3ecfe 100644 --- a/test/config.json +++ b/test/config.json @@ -19,6 +19,33 @@ "RetryMax": 0, "Tags": [] }, + "xtrabackup": { + "File": "xtrabackup.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 3, + "RetryMax": 0, + "Tags": [] + }, + "xtrabackup_xbstream": { + "File": "xtrabackup_xbstream.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 3, + "RetryMax": 0, + "Tags": [] + }, + "xtrabackup_xtra": { + "File": "xtrabackup_xtra.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 3, + "RetryMax": 0, + "Tags": [] + }, "binlog": { "File": "binlog.py", "Args": [], diff --git a/test/xtrabackup.py b/test/xtrabackup.py new file mode 100755 index 00000000000..135bdf60ce8 --- /dev/null +++ b/test/xtrabackup.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +# Copyright 2019 The Vitess Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Re-runs backup.py with use_xtrabackup=True.""" + +import backup +import utils + +if __name__ == '__main__': + backup.use_xtrabackup = True + utils.main(backup) diff --git a/test/xtrabackup_xbstream.py b/test/xtrabackup_xbstream.py new file mode 100755 index 00000000000..751d2f6af62 --- /dev/null +++ b/test/xtrabackup_xbstream.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +# Copyright 2019 The Vitess Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Re-runs backup.py with use_xtrabackup=True.""" + +import backup +import utils + +if __name__ == '__main__': + backup.use_xtrabackup = True + backup.stream_mode = 'xbstream' + utils.main(backup) diff --git a/test/xtrabackup_xtra.py b/test/xtrabackup_xtra.py new file mode 100755 index 00000000000..c3e04d9874c --- /dev/null +++ b/test/xtrabackup_xtra.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python + +# Copyright 2019 The Vitess Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import logging +import os +import unittest + +import MySQLdb + +import environment +import tablet +import utils + +use_mysqlctld = False +use_xtrabackup = True +stream_mode = 'tar' +xtrabackup_args = ['-backup_engine_implementation', + 'xtrabackup', + '-xtrabackup_stream_mode', + stream_mode, + '-xtrabackup_user=vt_dba', + '-xtrabackup_backup_flags', + '--password=VtDbaPass'] + +tablet_master = None +tablet_replica1 = None +tablet_replica2 = None + +new_init_db = '' +db_credentials_file = '' + + +def setUpModule(): + global new_init_db, db_credentials_file + global tablet_master, tablet_replica1, tablet_replica2 + + tablet_master = tablet.Tablet(use_mysqlctld=use_mysqlctld, + vt_dba_passwd='VtDbaPass') + tablet_replica1 = tablet.Tablet(use_mysqlctld=use_mysqlctld, + vt_dba_passwd='VtDbaPass') + tablet_replica2 = tablet.Tablet(use_mysqlctld=use_mysqlctld, + vt_dba_passwd='VtDbaPass') + + try: + environment.topo_server().setup() + + # Determine which column is used for user passwords in this MySQL version. + proc = tablet_master.init_mysql() + if use_mysqlctld: + tablet_master.wait_for_mysqlctl_socket() + else: + utils.wait_procs([proc]) + try: + tablet_master.mquery('mysql', 'select password from mysql.user limit 0', + user='root') + password_col = 'password' + except MySQLdb.DatabaseError: + password_col = 'authentication_string' + utils.wait_procs([tablet_master.teardown_mysql()]) + tablet_master.remove_tree(ignore_options=True) + + # Create a new init_db.sql file that sets up passwords for all users. + # Then we use a db-credentials-file with the passwords. + new_init_db = environment.tmproot + '/init_db_with_passwords.sql' + with open(environment.vttop + '/config/init_db.sql') as fd: + init_db = fd.read() + with open(new_init_db, 'w') as fd: + fd.write(init_db) + fd.write(''' +# Set real passwords for all users except vt_backup +UPDATE mysql.user SET %s = PASSWORD('RootPass') + WHERE User = 'root' AND Host = 'localhost'; +UPDATE mysql.user SET %s = PASSWORD('VtDbaPass') + WHERE User = 'vt_dba' AND Host = 'localhost'; +UPDATE mysql.user SET %s = PASSWORD('VtAppPass') + WHERE User = 'vt_app' AND Host = 'localhost'; +UPDATE mysql.user SET %s = PASSWORD('VtAllprivsPass') + WHERE User = 'vt_allprivs' AND Host = 'localhost'; +UPDATE mysql.user SET %s = PASSWORD('VtReplPass') + WHERE User = 'vt_repl' AND Host = '%%'; +UPDATE mysql.user SET %s = PASSWORD('VtFilteredPass') + WHERE User = 'vt_filtered' AND Host = 'localhost'; +FLUSH PRIVILEGES; +''' % tuple([password_col] * 6)) + credentials = { + 'vt_dba': ['VtDbaPass'], + 'vt_app': ['VtAppPass'], + 'vt_allprivs': ['VtAllprivsPass'], + 'vt_repl': ['VtReplPass'], + 'vt_filtered': ['VtFilteredPass'], + } + db_credentials_file = environment.tmproot+'/db_credentials.json' + with open(db_credentials_file, 'w') as fd: + fd.write(json.dumps(credentials)) + + # start mysql instance external to the test + setup_procs = [ + tablet_master.init_mysql(init_db=new_init_db, + extra_args=['-db-credentials-file', + db_credentials_file]), + tablet_replica1.init_mysql(init_db=new_init_db, + extra_args=['-db-credentials-file', + db_credentials_file]), + tablet_replica2.init_mysql(init_db=new_init_db, + extra_args=['-db-credentials-file', + db_credentials_file]), + ] + if use_mysqlctld: + tablet_master.wait_for_mysqlctl_socket() + tablet_replica1.wait_for_mysqlctl_socket() + tablet_replica2.wait_for_mysqlctl_socket() + else: + utils.wait_procs(setup_procs) + except: + tearDownModule() + raise + + +def tearDownModule(): + utils.required_teardown() + if utils.options.skip_teardown: + return + + teardown_procs = [ + tablet_master.teardown_mysql(extra_args=['-db-credentials-file', + db_credentials_file]), + tablet_replica1.teardown_mysql(extra_args=['-db-credentials-file', + db_credentials_file]), + tablet_replica2.teardown_mysql(extra_args=['-db-credentials-file', + db_credentials_file]), + ] + utils.wait_procs(teardown_procs, raise_on_error=False) + + environment.topo_server().teardown() + utils.kill_sub_processes() + utils.remove_tmp_files() + + tablet_master.remove_tree() + tablet_replica1.remove_tree() + tablet_replica2.remove_tree() + + +class TestXtraBackup(unittest.TestCase): + + def setUp(self): + for t in tablet_master, tablet_replica1: + t.create_db('vt_test_keyspace') + + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) + tablet_master.init_tablet('replica', 'test_keyspace', '0', start=True, + supports_backups=True, + extra_args=xtra_args) + tablet_replica1.init_tablet('replica', 'test_keyspace', '0', start=True, + supports_backups=True, + extra_args=xtra_args) + utils.run_vtctl(['InitShardMaster', '-force', 'test_keyspace/0', + tablet_master.tablet_alias]) + + def tearDown(self): + for t in tablet_master, tablet_replica1, tablet_replica2: + t.kill_vttablet() + + tablet.Tablet.check_vttablet_count() + environment.topo_server().wipe() + for t in [tablet_master, tablet_replica1, tablet_replica2]: + t.reset_replication() + t.set_semi_sync_enabled(master=False, slave=False) + t.clean_dbs() + + for backup in self._list_backups(): + self._remove_backup(backup) + + _create_vt_insert_test = '''create table vt_insert_test ( + id bigint auto_increment, + msg varchar(64), + primary key (id) + ) Engine=InnoDB''' + + def _insert_data(self, t, index): + """Add a single row with value 'index' to the given tablet.""" + t.mquery( + 'vt_test_keyspace', + "insert into vt_insert_test (msg) values ('test %s')" % + index, write=True) + + def _check_data(self, t, count, msg): + """Check that the specified tablet has the expected number of rows.""" + timeout = 10 + while True: + try: + result = t.mquery( + 'vt_test_keyspace', 'select count(*) from vt_insert_test') + if result[0][0] == count: + break + except MySQLdb.DatabaseError: + # ignore exceptions, we'll just timeout (the tablet creation + # can take some time to replicate, and we get a 'table vt_insert_test + # does not exist exception in some rare cases) + logging.exception('exception waiting for data to replicate') + timeout = utils.wait_step(msg, timeout) + + def _restore(self, t, tablet_type='replica'): + """Erase mysql/tablet dir, then start tablet with restore enabled.""" + self._reset_tablet_dir(t) + + xtra_args = ['-db-credentials-file', db_credentials_file] + if use_xtrabackup: + xtra_args.extend(xtrabackup_args) + + t.start_vttablet(wait_for_state='SERVING', + init_tablet_type=tablet_type, + init_keyspace='test_keyspace', + init_shard='0', + supports_backups=True, + extra_args=xtra_args) + + # check semi-sync is enabled for replica, disabled for rdonly. + if tablet_type == 'replica': + t.check_db_var('rpl_semi_sync_slave_enabled', 'ON') + t.check_db_status('rpl_semi_sync_slave_status', 'ON') + else: + t.check_db_var('rpl_semi_sync_slave_enabled', 'OFF') + t.check_db_status('rpl_semi_sync_slave_status', 'OFF') + + def _reset_tablet_dir(self, t): + """Stop mysql, delete everything including tablet dir, restart mysql.""" + extra_args = ['-db-credentials-file', db_credentials_file] + utils.wait_procs([t.teardown_mysql(extra_args=extra_args)]) + # Specify ignore_options because we want to delete the tree even + # if the test's -k / --keep-logs was specified on the command line. + t.remove_tree(ignore_options=True) + proc = t.init_mysql(init_db=new_init_db, extra_args=extra_args) + if use_mysqlctld: + t.wait_for_mysqlctl_socket() + else: + utils.wait_procs([proc]) + + def _list_backups(self): + """Get a list of backup names for the test shard.""" + backups, _ = utils.run_vtctl(tablet.get_backup_storage_flags() + + ['ListBackups', 'test_keyspace/0'], + mode=utils.VTCTL_VTCTL, trap_output=True) + return backups.splitlines() + + def _remove_backup(self, backup): + """Remove a named backup from the test shard.""" + utils.run_vtctl( + tablet.get_backup_storage_flags() + + ['RemoveBackup', 'test_keyspace/0', backup], + auto_log=True, mode=utils.VTCTL_VTCTL) + + def _restore_old_master_test(self, restore_method): + """Test that a former master replicates correctly after being restored. + + - Take a backup. + - Reparent from old master to new master. + - Force old master to restore from a previous backup using restore_method. + + Args: + restore_method: function accepting one parameter of type tablet.Tablet, + this function is called to force a restore on the provided tablet + """ + + # insert data on master, wait for slave to get it + tablet_master.mquery('vt_test_keyspace', self._create_vt_insert_test) + self._insert_data(tablet_master, 1) + self._check_data(tablet_replica1, 1, 'replica1 tablet getting data') + + # backup the slave + utils.run_vtctl(['Backup', tablet_replica1.tablet_alias], auto_log=True) + + # insert more data on the master + self._insert_data(tablet_master, 2) + + # reparent to replica1 + utils.run_vtctl(['PlannedReparentShard', + '-keyspace_shard', 'test_keyspace/0', + '-new_master', tablet_replica1.tablet_alias]) + + # insert more data on new master + self._insert_data(tablet_replica1, 3) + + # force the old master to restore at the latest backup. + restore_method(tablet_master) + + # wait for it to catch up. + self._check_data(tablet_master, 3, 'former master catches up after restore') + + def test_terminated_restore(self): + def _terminated_restore(t): + for e in utils.vtctld_connection.execute_vtctl_command( + ['RestoreFromBackup', t.tablet_alias]): + logging.info('%s', e.value) + if 'shutdown mysqld' in e.value: + break + logging.info('waiting for restore to finish') + utils.wait_for_tablet_type(t.tablet_alias, 'replica', timeout=30) + + utils.Vtctld().start() + self._restore_old_master_test(_terminated_restore) + +if __name__ == '__main__': + utils.main()