Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions go/vt/mysqlctl/mysqlshellbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ var (
// disable redo logging and double write buffer
mysqlShellSpeedUpRestore = false

mysqlShellBackupBinaryName = "mysqlsh"

// use when checking if we need to create the directory on the local filesystem or not.
knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"}

Expand Down Expand Up @@ -87,7 +85,9 @@ type MySQLShellBackupManifest struct {
}

func init() {
BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{}
BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{
binaryName: "mysqlsh",
}

for _, cmd := range []string{"vtcombo", "vttablet", "vtbackup", "vttestserver", "vtctldclient"} {
servenv.OnParseFor(cmd, registerMysqlShellBackupEngineFlags)
Expand All @@ -106,6 +106,7 @@ func registerMysqlShellBackupEngineFlags(fs *pflag.FlagSet) {
// MySQLShellBackupEngine encapsulates the logic to implement the restoration
// of a mysql-shell based backup.
type MySQLShellBackupEngine struct {
binaryName string
}

const (
Expand Down Expand Up @@ -164,41 +165,37 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back
return BackupUnusable, vterrors.Wrap(err, "failed to fetch position")
}

cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...)
cmd := exec.CommandContext(ctx, be.binaryName, args...)

params.Logger.Infof("running %s", cmd.String())

cmdOut, err := cmd.StdoutPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stdout pipe")
}
cmdOriginalErr, err := cmd.StderrPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stderr pipe")
}
if err := cmd.Start(); err != nil {
return BackupUnusable, vterrors.Wrap(err, "can't start mysqlshell")
}
stdoutReader, stdoutWriter := io.Pipe()
stderrReader, stderrWriter := io.Pipe()
lockWaiterReader, lockWaiterWriter := io.Pipe()

pipeReader, pipeWriter := io.Pipe()
cmdErr := io.TeeReader(cmdOriginalErr, pipeWriter)
cmd.Stdout = stdoutWriter
cmd.Stderr = stderrWriter
combinedErr := io.TeeReader(stderrReader, lockWaiterWriter)

cmdWg := &sync.WaitGroup{}
cmdWg.Add(3)
go releaseReadLock(ctx, pipeReader, params, cmdWg, lockAcquired)
go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done)
go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done)
go releaseReadLock(ctx, lockWaiterReader, params, cmdWg, lockAcquired)
go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", stdoutReader, params.Logger, cmdWg.Done)
go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", combinedErr, params.Logger, cmdWg.Done)

// Get exit status.
if err := cmd.Wait(); err != nil {
pipeWriter.Close() // make sure we close the writer so the goroutines above will complete.
return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}
// we run the command, wait for it to complete and close all pipes so the goroutines can complete on their own.
// after that we can process if an error has happened or not.
err = cmd.Run()

// close the pipeWriter and wait for the goroutines to have read all the logs
pipeWriter.Close()
stdoutWriter.Close()
stderrWriter.Close()
lockWaiterWriter.Close()
cmdWg.Wait()

if err != nil {
return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}

// open the MANIFEST
params.Logger.Infof("Writing backup MANIFEST")
mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown)
Expand Down Expand Up @@ -371,7 +368,7 @@ func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params Res
if err := cmd.Wait(); err != nil {
return nil, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}
params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName)
params.Logger.Infof("%s completed successfully", be.binaryName)

// disable local_infile now that the restore is done.
err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=0")
Expand Down
18 changes: 9 additions & 9 deletions go/vt/mysqlctl/mysqlshellbackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,10 @@ func generateTestFile(t *testing.T, name, contents string) {
// during ExecuteBackup(), even if the backup didn't succeed.
func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
originalLocation := mysqlShellBackupLocation
originalBinary := mysqlShellBackupBinaryName
mysqlShellBackupLocation = "logical"
mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh")

defer func() { // restore the original values.
defer func() { // restore the original value.
mysqlShellBackupLocation = originalLocation
mysqlShellBackupBinaryName = originalBinary
}()

logger := logutil.NewMemoryLogger()
Expand All @@ -340,7 +337,6 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
mysql := NewFakeMysqlDaemon(fakedb)
defer mysql.Close()

be := &MySQLShellBackupEngine{}
params := BackupParams{
TabletAlias: "test",
Logger: logger,
Expand All @@ -351,6 +347,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{
Expand All @@ -359,7 +356,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates mysql shell completing without any issues.
generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage))
generateTestFile(t, be.binaryName, fmt.Sprintf(
"#!/bin/bash\n>&2 echo %s; echo \"backup completed\"; sleep 0.01", mysqlShellLockMessage))

bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand All @@ -380,7 +378,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
"failed to release the global lock after mysqlsh")
})

t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) {
t.Run("lock released if we don't see mysqlsh release it", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
mysql.GlobalReadLock = false // clear lock status.
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
Expand All @@ -390,7 +389,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates mysqlshell completing, but we don't see the message that is released its lock.
generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0")
generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 0")

bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand All @@ -407,6 +406,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
})

t.Run("lock released when backup fails", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
mysql.GlobalReadLock = false // clear lock status.
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
Expand All @@ -416,7 +416,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates the backup process failing.
generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1")
generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 1")

bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand Down
Loading