diff --git a/go/ioutil/timeout_closer.go b/go/ioutil/timeout_closer.go new file mode 100644 index 00000000000..0545d81cd00 --- /dev/null +++ b/go/ioutil/timeout_closer.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "context" + "io" + "time" +) + +// TimeoutCloser is an io.Closer that has a timeout for executing the Close() function. +type TimeoutCloser struct { + ctx context.Context + closer io.Closer + timeout time.Duration +} + +func NewTimeoutCloser(ctx context.Context, closer io.Closer, timeout time.Duration) *TimeoutCloser { + return &TimeoutCloser{ + ctx: ctx, + closer: closer, + timeout: timeout, + } +} + +func (c *TimeoutCloser) Close() error { + done := make(chan error) + + ctx, cancel := context.WithTimeout(c.ctx, c.timeout) + defer cancel() + + go func() { + defer close(done) + done <- c.closer.Close() + }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go new file mode 100644 index 00000000000..9aabe307c85 --- /dev/null +++ b/go/ioutil/timeout_closer_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type hangCloser struct { + hang bool +} + +func (c hangCloser) Close() error { + if c.hang { + ch := make(chan bool) + ch <- true // hang forever + } + return nil +} + +func TestTimeoutCloser(t *testing.T) { + ctx := context.Background() + { + closer := NewTimeoutCloser(ctx, &hangCloser{hang: false}, time.Second) + err := closer.Close() + require.NoError(t, err) + } + { + closer := NewTimeoutCloser(ctx, &hangCloser{hang: true}, time.Second) + err := closer.Close() + require.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) + } +} diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index c07c15d16b5..1f619d57344 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -57,6 +57,11 @@ const ( RestoreState = "restore_in_progress" // BackupTimestampFormat is the format in which we save BackupTime and FinishedTime BackupTimestampFormat = "2006-01-02.150405" + + // closeTimeout is the timeout for closing backup files after writing. + // The value is a bit arbitrary. How long does it make sense to wait for a Close()? With a cloud-based implementation, + // network might be an issue. _Seconds_ are probably too short. The whereabouts of a minute us a reasonable value. + closeTimeout = 1 * time.Minute ) const ( diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/backup_blackbox_test.go new file mode 100644 index 00000000000..f7af0cdc306 --- /dev/null +++ b/go/vt/mysqlctl/backup_blackbox_test.go @@ -0,0 +1,579 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mysqlctl_test is the blackbox tests for package mysqlctl. +package mysqlctl_test + +import ( + "context" + "fmt" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/replication" + + "vitess.io/vitess/go/sqltypes" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" + "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +func setBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { + old := mysqlctl.BuiltinBackupMysqldTimeout + mysqlctl.BuiltinBackupMysqldTimeout = t + + return old +} + +func createBackupDir(root string, dirs ...string) error { + for _, dir := range dirs { + if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { + return err + } + } + + return nil +} + +func createBackupFiles(root string, fileCount int, ext string) error { + for i := 0; i < fileCount; i++ { + f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) + if err != nil { + return err + } + if _, err := f.Write([]byte("hello, world!")); err != nil { + return err + } + defer f.Close() + } + + return nil +} + +func TestExecuteBackup(t *testing.T) { + // Set up local backup directory + backupRoot := "testdata/builtinbackup_test" + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to actually backup files. + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + + // Configure a tight deadline to force a timeout + oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) + defer setBuiltinBackupMysqldDeadline(oldDeadline) + + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + // mysqld.ShutdownTime = time.Minute + + fakeStats := backupstats.NewFakeStats() + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + Stats: fakeStats, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) + + var destinationCloseStats int + var destinationOpenStats int + var destinationWriteStats int + var sourceCloseStats int + var sourceOpenStats int + var sourceReadStats int + + for _, sr := range fakeStats.ScopeReturns { + sfs := sr.(*backupstats.FakeStats) + switch sfs.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + destinationCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Open": + destinationOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Write": + destinationWriteStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + case "Source:Close": + sourceCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Open": + sourceOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Read": + sourceReadStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + } + } + + require.Equal(t, 4, destinationCloseStats) + require.Equal(t, 4, destinationOpenStats) + require.Equal(t, 4, destinationWriteStats) + require.Equal(t, 4, sourceCloseStats) + require.Equal(t, 4, sourceOpenStats) + require.Equal(t, 4, sourceReadStats) + + mysqld.ExpectedExecuteSuperQueryCurrent = 0 // resest the index of what queries we've run + mysqld.ShutdownTime = time.Minute // reminder that shutdownDeadline is 1s + + ok, err = be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + assert.Error(t, err) + assert.False(t, ok) +} + +func TestExecuteBackupWithSafeUpgrade(t *testing.T) { + // Set up local backup directory + backupRoot := "testdata/builtinbackup_test" + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to actually backup files. + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + + // Configure a tight deadline to force a timeout + oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) + defer setBuiltinBackupMysqldDeadline(oldDeadline) + + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + // It also needs to be allowed to receive the query to disable the innodb_fast_shutdown flag. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SET GLOBAL innodb_fast_shutdown=0": {}, + } + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Concurrency: 2, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + Stats: backupstats.NewFakeStats(), + UpgradeSafe: true, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) +} + +// TestExecuteBackupWithCanceledContext tests the ability of the backup function to gracefully handle cases where errors +// occur due to various reasons, such as context time cancel. The process should not panic in these situations. +func TestExecuteBackupWithCanceledContext(t *testing.T) { + // Set up local backup directory + id := fmt.Sprintf("%d", time.Now().UnixNano()) + backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to execute semaphore acquire inside + // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + // Cancel the context deliberately + ctx, cancel := context.WithCancel(context.Background()) + cancel() + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + require.Error(t, err) + // all four files will fail + require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") + assert.False(t, ok) +} + +// TestExecuteRestoreWithCanceledContext tests the ability of the restore function to gracefully handle cases where errors +// occur due to various reasons, such as context timed-out. The process should not panic in these situations. +func TestExecuteRestoreWithTimedOutContext(t *testing.T) { + // Set up local backup directory + id := fmt.Sprintf("%d", time.Now().UnixNano()) + backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to execute semaphore acquire inside + // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) + + // Now try to restore the above backup. + bh = filebackupstorage.NewBackupHandle(nil, "", "", true) + mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + fakeStats := backupstats.NewFakeStats() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Concurrency: 2, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, bh) + assert.NoError(t, err) + assert.NotNil(t, bm) + + var destinationCloseStats int + var destinationOpenStats int + var destinationWriteStats int + var sourceCloseStats int + var sourceOpenStats int + var sourceReadStats int + + for _, sr := range fakeStats.ScopeReturns { + sfs := sr.(*backupstats.FakeStats) + switch sfs.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + destinationCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Open": + destinationOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Write": + destinationWriteStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + case "Source:Close": + sourceCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Open": + sourceOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Read": + sourceReadStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + } + } + + require.Equal(t, 4, destinationCloseStats) + require.Equal(t, 4, destinationOpenStats) + require.Equal(t, 4, destinationWriteStats) + require.Equal(t, 4, sourceCloseStats) + require.Equal(t, 4, sourceOpenStats) + require.Equal(t, 4, sourceReadStats) + + // Restore using timed-out context + mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + restoreParams.Mysqld = mysqld + timedOutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + // Let the context time out. + time.Sleep(1 * time.Second) + bm, err = be.ExecuteRestore(timedOutCtx, restoreParams, bh) + // ExecuteRestore should fail. + assert.Error(t, err) + assert.Nil(t, bm) + // error message can contain any combination of "context deadline exceeded" or "context canceled" + if !strings.Contains(err.Error(), "context canceled") && !strings.Contains(err.Error(), "context deadline exceeded") { + assert.Fail(t, "Test should fail with either `context canceled` or `context deadline exceeded`") + } +} + +// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. +// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the +// (/. by default) called "#innodb_redo". See: +// +// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity +func needInnoDBRedoLogSubdir() (needIt bool, err error) { + mysqldVersionStr, err := mysqlctl.GetVersionString() + if err != nil { + return needIt, err + } + _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) + if err != nil { + return needIt, err + } + versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) + _, capableOf, _ := mysql.GetFlavor(versionStr, nil) + if capableOf == nil { + return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + } + return capableOf(mysql.DynamicRedoLogCapacityFlavorCapability) +} diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index b1593a815b2..e8aac724442 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -21,6 +21,7 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" "hash" "hash/crc32" @@ -760,6 +761,8 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) @@ -797,12 +800,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func(name, fileName string) { closeDestAt := time.Now() if rerr := dest.Close(); rerr != nil { - if finalErr != nil { - // We already have an error, just log this one. - params.Logger.Errorf2(rerr, "failed to close file %v,%v", name, fe.Name) - } else { - finalErr = rerr - } + rerr = vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name) + params.Logger.Error(rerr) + finalErr = errors.Join(finalErr, rerr) } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }(name, fe.Name) @@ -812,43 +812,57 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara bw := newBackupWriter(fe.Name, builtinBackupStorageWriteBufferSize, fi.Size(), timedDest) - var reader io.Reader = br - var writer io.Writer = bw + // We create the following inner function because: + // - we must `defer` the compressor's Close() function + // - but it must take place before we close the pipe reader&writer + createAndCopy := func() (createAndCopyErr error) { + var reader io.Reader = br + var writer io.Writer = bw + + // Create the gzip compression pipe, if necessary. + if backupStorageCompress { + var compressor io.WriteCloser + if ExternalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) + } + if err != nil { + return vterrors.Wrap(err, "can't create compressor") + } - // Create the gzip compression pipe, if necessary. - var compressor io.WriteCloser - if backupStorageCompress { - if ExternalCompressorCmd != "" { - compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) - } else { - compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) - } - if err != nil { - return vterrors.Wrap(err, "can't create compressor") - } + compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) + writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) - compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) - writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) - } + closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) + defer func() { + // Close gzip to flush it, after that all data is sent to writer. + closeCompressorAt := time.Now() + params.Logger.Infof("closing compressor") + if cerr := closer.Close(); err != nil { + cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) + params.Logger.Error(cerr) + createAndCopyErr = errors.Join(createAndCopyErr, cerr) + } + params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) + }() + } - if builtinBackupFileReadBufferSize > 0 { - reader = bufio.NewReaderSize(br, int(builtinBackupFileReadBufferSize)) - } + if builtinBackupFileReadBufferSize > 0 { + reader = bufio.NewReaderSize(br, int(builtinBackupFileReadBufferSize)) + } - // Copy from the source file to writer (optional gzip, - // optional pipe, tee, output file and hasher). - _, err = io.Copy(writer, reader) - if err != nil { - return vterrors.Wrap(err, "cannot copy data") + // Copy from the source file to writer (optional gzip, + // optional pipe, tee, output file and hasher). + _, err = io.Copy(writer, reader) + if err != nil { + return vterrors.Wrap(err, "cannot copy data") + } + return nil } - // Close gzip to flush it, after that all data is sent to writer. - if compressor != nil { - closeCompressorAt := time.Now() - if err = compressor.Close(); err != nil { - return vterrors.Wrap(err, "cannot close compressor") - } - params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) + if err := createAndCopy(); err != nil { + return err } // Close the backupPipe to finish writing on destination. @@ -1017,6 +1031,8 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP // restoreFile restores an individual file. func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Open the source file for reading. openSourceAt := time.Now() source, err := bh.ReadFile(ctx, name) @@ -1049,12 +1065,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDestAt := time.Now() if cerr := dest.Close(); cerr != nil { - if finalErr != nil { - // We already have an error, just log this one. - log.Errorf("failed to close file %v: %v", name, cerr) - } else { - finalErr = vterrors.Wrap(cerr, "failed to close destination file") - } + finalErr = errors.Join(finalErr, vterrors.Wrap(cerr, "failed to close destination file")) } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }() @@ -1093,27 +1104,25 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa if err != nil { return vterrors.Wrap(err, "can't create decompressor") } + closer := ioutil.NewTimeoutCloser(ctx, decompressor, closeTimeout) decompressStats := params.Stats.Scope(stats.Operation("Decompressor:Read")) reader = ioutil.NewMeteredReader(decompressor, decompressStats.TimedIncrementBytes) defer func() { closeDecompressorAt := time.Now() - if cerr := decompressor.Close(); cerr != nil { - params.Logger.Errorf("failed to close decompressor: %v", cerr) - if finalErr != nil { - // We already have an error, just log this one. - log.Errorf("failed to close decompressor %v: %v", name, cerr) - } else { - finalErr = vterrors.Wrap(cerr, "failed to close decompressor") - } + params.Logger.Infof("closing decompressor") + if cerr := closer.Close(); err != nil { + cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name) + params.Logger.Error(cerr) + finalErr = errors.Join(finalErr, cerr) } params.Stats.Scope(stats.Operation("Decompressor:Close")).TimedIncrement(time.Since(closeDecompressorAt)) }() } // Copy the data. Will also write to the hasher. - if _, err = io.Copy(bufferedDest, reader); err != nil { + if _, err := io.Copy(bufferedDest, reader); err != nil { return vterrors.Wrap(err, "failed to copy file contents") } diff --git a/go/vt/mysqlctl/builtinbackupengine2_test.go b/go/vt/mysqlctl/builtinbackupengine2_test.go deleted file mode 100644 index 29252392c7f..00000000000 --- a/go/vt/mysqlctl/builtinbackupengine2_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGetIncrementalFromPosGTIDSet(t *testing.T) { - tcases := []struct { - incrementalFromPos string - gtidSet string - expctError bool - }{ - { - "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - false, - }, - { - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - false, - }, - { - "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3", - "", - true, - }, - { - "MySQL56/invalid", - "", - true, - }, - { - "16b1039f-22b6-11ed-b765-0a43f95f28a3", - "", - true, - }, - } - for _, tcase := range tcases { - t.Run(tcase.incrementalFromPos, func(t *testing.T) { - gtidSet, err := getIncrementalFromPosGTIDSet(tcase.incrementalFromPos) - if tcase.expctError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, tcase.gtidSet, gtidSet.String()) - } - }) - } -} diff --git a/go/vt/mysqlctl/builtinbackupengine_test.go b/go/vt/mysqlctl/builtinbackupengine_test.go index f7af0cdc306..29252392c7f 100644 --- a/go/vt/mysqlctl/builtinbackupengine_test.go +++ b/go/vt/mysqlctl/builtinbackupengine_test.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Vitess Authors. +Copyright 2023 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,565 +15,55 @@ limitations under the License. */ // Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl_test +package mysqlctl import ( - "context" - "fmt" - "os" - "path" - "strings" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql/replication" - - "vitess.io/vitess/go/sqltypes" - - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/mysql/fakesqldb" - "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/mysqlctl/backupstats" - "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" - "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vttime" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/memorytopo" ) -func setBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { - old := mysqlctl.BuiltinBackupMysqldTimeout - mysqlctl.BuiltinBackupMysqldTimeout = t - - return old -} - -func createBackupDir(root string, dirs ...string) error { - for _, dir := range dirs { - if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { - return err - } - } - - return nil -} - -func createBackupFiles(root string, fileCount int, ext string) error { - for i := 0; i < fileCount; i++ { - f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) - if err != nil { - return err - } - if _, err := f.Write([]byte("hello, world!")); err != nil { - return err - } - defer f.Close() - } - - return nil -} - -func TestExecuteBackup(t *testing.T) { - // Set up local backup directory - backupRoot := "testdata/builtinbackup_test" - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to actually backup files. - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - - // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) - - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - // mysqld.ShutdownTime = time.Minute - - fakeStats := backupstats.NewFakeStats() - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), - }, - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - Stats: fakeStats, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) - - var destinationCloseStats int - var destinationOpenStats int - var destinationWriteStats int - var sourceCloseStats int - var sourceOpenStats int - var sourceReadStats int - - for _, sr := range fakeStats.ScopeReturns { - sfs := sr.(*backupstats.FakeStats) - switch sfs.ScopeV[backupstats.ScopeOperation] { - case "Destination:Close": - destinationCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Open": - destinationOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Write": - destinationWriteStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - case "Source:Close": - sourceCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Open": - sourceOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Read": - sourceReadStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - } - } - - require.Equal(t, 4, destinationCloseStats) - require.Equal(t, 4, destinationOpenStats) - require.Equal(t, 4, destinationWriteStats) - require.Equal(t, 4, sourceCloseStats) - require.Equal(t, 4, sourceOpenStats) - require.Equal(t, 4, sourceReadStats) - - mysqld.ExpectedExecuteSuperQueryCurrent = 0 // resest the index of what queries we've run - mysqld.ShutdownTime = time.Minute // reminder that shutdownDeadline is 1s - - ok, err = be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), +func TestGetIncrementalFromPosGTIDSet(t *testing.T) { + tcases := []struct { + incrementalFromPos string + gtidSet string + expctError bool + }{ + { + "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + false, }, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - assert.Error(t, err) - assert.False(t, ok) -} - -func TestExecuteBackupWithSafeUpgrade(t *testing.T) { - // Set up local backup directory - backupRoot := "testdata/builtinbackup_test" - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to actually backup files. - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - - // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) - - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - // It also needs to be allowed to receive the query to disable the innodb_fast_shutdown flag. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SET GLOBAL innodb_fast_shutdown=0": {}, - } - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + false, }, - Concurrency: 2, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - Stats: backupstats.NewFakeStats(), - UpgradeSafe: true, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) -} - -// TestExecuteBackupWithCanceledContext tests the ability of the backup function to gracefully handle cases where errors -// occur due to various reasons, such as context time cancel. The process should not panic in these situations. -func TestExecuteBackupWithCanceledContext(t *testing.T) { - // Set up local backup directory - id := fmt.Sprintf("%d", time.Now().UnixNano()) - backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to execute semaphore acquire inside - // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - // Cancel the context deliberately - ctx, cancel := context.WithCancel(context.Background()) - cancel() - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3", + "", + true, }, - Stats: backupstats.NewFakeStats(), - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - require.Error(t, err) - // all four files will fail - require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") - assert.False(t, ok) -} - -// TestExecuteRestoreWithCanceledContext tests the ability of the restore function to gracefully handle cases where errors -// occur due to various reasons, such as context timed-out. The process should not panic in these situations. -func TestExecuteRestoreWithTimedOutContext(t *testing.T) { - // Set up local backup directory - id := fmt.Sprintf("%d", time.Now().UnixNano()) - backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to execute semaphore acquire inside - // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "MySQL56/invalid", + "", + true, }, - Stats: backupstats.NewFakeStats(), - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) - - // Now try to restore the above backup. - bh = filebackupstorage.NewBackupHandle(nil, "", "", true) - mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - fakeStats := backupstats.NewFakeStats() - - restoreParams := mysqlctl.RestoreParams{ - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), - BinLogPath: path.Join(backupRoot, "binlog"), - RelayLogPath: path.Join(backupRoot, "relaylog"), - RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), - RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + { + "16b1039f-22b6-11ed-b765-0a43f95f28a3", + "", + true, }, - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Concurrency: 2, - HookExtraEnv: map[string]string{}, - DeleteBeforeRestore: false, - DbName: "test", - Keyspace: "test", - Shard: "-", - StartTime: time.Now(), - RestoreToPos: replication.Position{}, - RestoreToTimestamp: time.Time{}, - DryRun: false, - Stats: fakeStats, - } - - // Successful restore. - bm, err := be.ExecuteRestore(ctx, restoreParams, bh) - assert.NoError(t, err) - assert.NotNil(t, bm) - - var destinationCloseStats int - var destinationOpenStats int - var destinationWriteStats int - var sourceCloseStats int - var sourceOpenStats int - var sourceReadStats int - - for _, sr := range fakeStats.ScopeReturns { - sfs := sr.(*backupstats.FakeStats) - switch sfs.ScopeV[backupstats.ScopeOperation] { - case "Destination:Close": - destinationCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Open": - destinationOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Write": - destinationWriteStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - case "Source:Close": - sourceCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Open": - sourceOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Read": - sourceReadStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - } - } - - require.Equal(t, 4, destinationCloseStats) - require.Equal(t, 4, destinationOpenStats) - require.Equal(t, 4, destinationWriteStats) - require.Equal(t, 4, sourceCloseStats) - require.Equal(t, 4, sourceOpenStats) - require.Equal(t, 4, sourceReadStats) - - // Restore using timed-out context - mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - restoreParams.Mysqld = mysqld - timedOutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - // Let the context time out. - time.Sleep(1 * time.Second) - bm, err = be.ExecuteRestore(timedOutCtx, restoreParams, bh) - // ExecuteRestore should fail. - assert.Error(t, err) - assert.Nil(t, bm) - // error message can contain any combination of "context deadline exceeded" or "context canceled" - if !strings.Contains(err.Error(), "context canceled") && !strings.Contains(err.Error(), "context deadline exceeded") { - assert.Fail(t, "Test should fail with either `context canceled` or `context deadline exceeded`") - } -} - -// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. -// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the -// (/. by default) called "#innodb_redo". See: -// -// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity -func needInnoDBRedoLogSubdir() (needIt bool, err error) { - mysqldVersionStr, err := mysqlctl.GetVersionString() - if err != nil { - return needIt, err - } - _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) - if err != nil { - return needIt, err } - versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) - _, capableOf, _ := mysql.GetFlavor(versionStr, nil) - if capableOf == nil { - return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + for _, tcase := range tcases { + t.Run(tcase.incrementalFromPos, func(t *testing.T) { + gtidSet, err := getIncrementalFromPosGTIDSet(tcase.incrementalFromPos) + if tcase.expctError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tcase.gtidSet, gtidSet.String()) + } + }) } - return capableOf(mysql.DynamicRedoLogCapacityFlavorCapability) } diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 59cd4cb9d95..fb9fa4c9667 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -32,8 +32,8 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/ioutil" "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -71,9 +71,6 @@ const ( xtrabackupBinaryName = "xtrabackup" xtrabackupEngineName = "xtrabackup" xbstream = "xbstream" - - // closeTimeout is the timeout for closing backup files after writing. - closeTimeout = 10 * time.Minute ) // xtraBackupManifest represents a backup. @@ -364,7 +361,7 @@ func (be *XtrabackupEngine) backupFiles( destWriters := []io.Writer{} destBuffers := []*bufio.Writer{} - destCompressors := []io.WriteCloser{} + destCompressors := []io.Closer{} for _, file := range destFiles { buffer := bufio.NewWriterSize(file, writerBufferSize) destBuffers = append(destBuffers, buffer) @@ -384,7 +381,7 @@ func (be *XtrabackupEngine) backupFiles( } writer = compressor - destCompressors = append(destCompressors, compressor) + destCompressors = append(destCompressors, ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout)) } destWriters = append(destWriters, writer) @@ -632,7 +629,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log }() srcReaders := []io.Reader{} - srcDecompressors := []io.ReadCloser{} + srcDecompressors := []io.Closer{} for _, file := range srcFiles { reader := io.Reader(file) @@ -665,7 +662,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log if err != nil { return vterrors.Wrap(err, "can't create decompressor") } - srcDecompressors = append(srcDecompressors, decompressor) + srcDecompressors = append(srcDecompressors, ioutil.NewTimeoutCloser(ctx, decompressor, closeTimeout)) reader = decompressor }