diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 7ff9d6b860f..14c7850beb8 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -18,6 +18,7 @@ package vtctlbackup import ( "bufio" + "context" "encoding/json" "fmt" "os" @@ -52,7 +53,8 @@ const ( XtraBackup = iota BuiltinBackup Mysqlctld - timeout = time.Duration(60 * time.Second) + timeout = time.Duration(60 * time.Second) + topoConsistencyTimeout = 20 * time.Second ) var ( @@ -1077,12 +1079,13 @@ func terminateRestore(t *testing.T) { func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) { replica := getReplica(t, replicaIndex) + numBackups := len(waitForNumBackups(t, -1)) err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias) require.Nil(t, err) - backups, err = localCluster.ListBackups(shardKsName) - require.Nil(t, err) + backups = waitForNumBackups(t, numBackups+1) + require.NotEmpty(t, backups) verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) @@ -1203,7 +1206,38 @@ func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.B return readManifestFile(t, backupLocation) } +// waitForNumBackups waits for GetBackups to list exactly the given expected number. +// If expectNumBackups < 0 then any response is considered valid +func waitForNumBackups(t *testing.T, expectNumBackups int) []string { + ctx, cancel := context.WithTimeout(context.Background(), topoConsistencyTimeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + backups, err := localCluster.ListBackups(shardKsName) + require.NoError(t, err) + if expectNumBackups < 0 { + // any result is valid + return backups + } + if len(backups) == expectNumBackups { + // what we waited for + return backups + } + assert.Less(t, len(backups), expectNumBackups) + select { + case <-ctx.Done(): + assert.Failf(t, ctx.Err().Error(), "expected %d backups, got %d", expectNumBackups, len(backups)) + return nil + case <-ticker.C: + } + } +} + func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { + numBackups := len(waitForNumBackups(t, -1)) incrementalFromPosArg := "auto" if !incrementalFromPos.IsZero() { incrementalFromPosArg = replication.EncodePosition(incrementalFromPos) @@ -1216,8 +1250,9 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre } require.NoErrorf(t, err, "output: %v", output) - backups, err := localCluster.ListBackups(shardKsName) - require.NoError(t, err) + backups := waitForNumBackups(t, numBackups+1) + require.NotEmptyf(t, backups, "output: %v", output) + verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) backupName = backups[len(backups)-1] backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index ddeb43c7dd7..2468940b641 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -132,6 +132,14 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) { name: "first incremental backup", }, + { + name: "fail1", + expectError: "no binary logs to backup", + }, + { + name: "fail2", + expectError: "no binary logs to backup", + }, { name: "make writes, succeed", writeBeforeBackup: true, @@ -170,10 +178,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) if tc.writeBeforeBackup { InsertRowOnPrimary(t, "") } - // we wait for 1 second because backups are written to a directory named after the current timestamp, + // we wait for >1 second because backups are written to a directory named after the current timestamp, // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. - // Also, we gie the replica a chance to catch up. + // Also, we give the replica a chance to catch up. time.Sleep(postWriteSleepDuration) // randomly flush binary logs 0, 1 or 2 times FlushBinaryLogsOnReplica(t, 0, rand.Intn(3)) @@ -295,6 +303,14 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes { name: "first incremental backup", }, + { + name: "fail1", + expectError: "no binary logs to backup", + }, + { + name: "fail2", + expectError: "no binary logs to backup", + }, { name: "make writes, succeed", writeBeforeBackup: true, @@ -333,10 +349,10 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes if tc.writeBeforeBackup { insertRowOnPrimary(t, "") } - // we wait for 1 second because backups are written to a directory named after the current timestamp, + // we wait for >1 second because backups are written to a directory named after the current timestamp, // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. - // Also, we gie the replica a chance to catch up. + // Also, we give the replica a chance to catch up. time.Sleep(postWriteSleepDuration) waitForReplica(t, 0) rowsBeforeBackup := ReadRowsFromReplica(t, 0) diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 1f619d57344..e1c5d04bd81 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -151,6 +151,7 @@ func Backup(ctx context.Context, params BackupParams) error { if err != nil { return vterrors.Wrap(err, "StartBackup failed") } + params.Logger.Infof("Starting backup %v", bh.Name()) // Scope stats to selected backup engine. beParams := params.Copy() diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 8442c855c0a..1424dc5980b 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -328,7 +328,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par // everything that's ever been applied, and a subset of that is gtid_purged, which are the event no longer available in binary logs. // When we consider Vitess incremental backups, what's important for us is "what's the GTIDSet that's true when this backup was taken, // and which will be true when we restore this backup". The answer to this is the GTIDSet that includes the purged GTIDs. - // It's also nice for icnremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged + // It's also nice for incremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged // on _this_ tablet. They don't care, all they want to know is "what GTIDSet can we get from this". incrementalBackupToPosition.GTIDSet = incrementalBackupToPosition.GTIDSet.Union(gtidPurged.GTIDSet) req := &mysqlctl.ReadBinlogFilesTimestampsRequest{} @@ -345,7 +345,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return false, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup) } if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" { - return false, vterrors.Wrapf(err, "empty binlog name in response. Request=%v, Response=%v", req, resp) + return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp) } incrDetails := &IncrementalBackupDetails{ FirstTimestamp: FormatRFC3339(logutil.ProtoToTime(resp.FirstTimestamp)), diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 716eacee1cf..b1db3ec425a 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -1321,6 +1321,60 @@ func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err er return false, t, nil } +// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function +// either looks for the first such timestamp or the last. +func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) { + args := []string{binlogFile} + mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...) + mysqlbinlogCmd.Dir = mysqlbinlogDir + mysqlbinlogCmd.Env = mysqlbinlogEnv + log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd) + pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql + if err != nil { + return matchedTime, false, err + } + scanComplete := make(chan error) + intentionalKill := false + scan := func() { + defer close(scanComplete) + defer func() { + intentionalKill = true + mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released + }() + // Read line by line and process it + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + logEntry := scanner.Text() + + found, t, err := parseBinlogEntryTimestamp(logEntry) + if err != nil { + scanComplete <- err + return + } + if found { + matchedTime = t + matchFound = true + } + if found && stopAtFirst { + // Found the first timestamp and it's all we need. We won't scan any further and so we should also + // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). + return + } + } + } + if err := mysqlbinlogCmd.Start(); err != nil { + return matchedTime, false, err + } + go scan() + if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { + return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") + } + if err := <-scanComplete; err != nil { + return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ") + } + return matchedTime, matchFound, nil +} + // ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlctlpb.ReadBinlogFilesTimestampsRequest) (*mysqlctlpb.ReadBinlogFilesTimestampsResponse, error) { if len(req.BinlogFileNames) == 0 { @@ -1335,8 +1389,6 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc defer client.Close() return client.ReadBinlogFilesTimestamps(ctx, req) } - var mysqlbinlogCmd *exec.Cmd - dir, err := vtenv.VtMysqlRoot() if err != nil { return nil, err @@ -1350,59 +1402,10 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc return nil, err } - scanTimestamp := func(binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) { - args := []string{binlogFile} - mysqlbinlogCmd = exec.Command(mysqlbinlogName, args...) - mysqlbinlogCmd.Dir = dir - mysqlbinlogCmd.Env = env - log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd) - pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql - if err != nil { - return matchedTime, false, err - } - scanner := bufio.NewScanner(pipe) - scanComplete := make(chan error) - intentionalKill := false - scan := func() { - defer close(scanComplete) - // Read line by line and process it - for scanner.Scan() { - logEntry := scanner.Text() - - found, t, err := parseBinlogEntryTimestamp(logEntry) - if err != nil { - scanComplete <- err - return - } - if found { - matchedTime = t - matchFound = true - } - if found && stopAtFirst { - // Found the first timestamp and it's all we need. We won't scan any further and so we should also - // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). - intentionalKill = true - mysqlbinlogCmd.Process.Kill() - return - } - } - } - if err := mysqlbinlogCmd.Start(); err != nil { - return matchedTime, false, err - } - go scan() - if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { - return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") - } - if err := <-scanComplete; err != nil { - return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ") - } - return matchedTime, matchFound, nil - } resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{} // Find first timestamp for _, binlogFile := range req.BinlogFileNames { - t, found, err := scanTimestamp(binlogFile, true) + t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true) if err != nil { return nil, err } @@ -1415,7 +1418,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc // Find last timestamp for i := len(req.BinlogFileNames) - 1; i >= 0; i-- { binlogFile := req.BinlogFileNames[i] - t, found, err := scanTimestamp(binlogFile, false) + t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false) if err != nil { return nil, err }