diff --git a/go/cmd/vtctldclient/command/schema.go b/go/cmd/vtctldclient/command/schema.go index 280463b2792..2ebe6bbb917 100644 --- a/go/cmd/vtctldclient/command/schema.go +++ b/go/cmd/vtctldclient/command/schema.go @@ -298,7 +298,7 @@ func init() { ApplySchema.Flags().StringVar(&applySchemaOptions.CallerID, "caller-id", "", "Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used).") ApplySchema.Flags().StringArrayVar(&applySchemaOptions.SQL, "sql", nil, "Semicolon-delimited, repeatable SQL commands to apply. Exactly one of --sql|--sql-file is required.") ApplySchema.Flags().StringVar(&applySchemaOptions.SQLFile, "sql-file", "", "Path to a file containing semicolon-delimited SQL commands to apply. Exactly one of --sql|--sql-file is required.") - ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicabel when all queries are CREATE TABLE|VIEW") + ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicable when all queries are CREATE TABLE|VIEW") Root.AddCommand(ApplySchema) diff --git a/go/test/endtoend/backup/pitr/backup_pitr_test.go b/go/test/endtoend/backup/pitr/backup_pitr_test.go index 6a21021f176..a1b29ef47dd 100644 --- a/go/test/endtoend/backup/pitr/backup_pitr_test.go +++ b/go/test/endtoend/backup/pitr/backup_pitr_test.go @@ -55,3 +55,15 @@ func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) { } backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase) } + +// TestIncrementalBackupOnTwoTablets runs a series of interleaved backups on two different replicas: full and incremental. +// Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on +// one another. +func TestIncrementalBackupOnTwoTablets(t *testing.T) { + tcase := &backup.PITRTestCase{ + Name: "BuiltinBackup", + SetupType: backup.BuiltinBackup, + ComprssDetails: nil, + } + backup.ExecTestIncrementalBackupOnTwoTablets(t, tcase) +} diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index af2b52f67ea..7ff9d6b860f 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -82,11 +82,12 @@ var ( } vtInsertTest = ` - create table vt_insert_test ( - id bigint auto_increment, - msg varchar(64), - primary key (id) - ) Engine=InnoDB` + create table vt_insert_test ( + id bigint auto_increment, + msg varchar(64), + primary key (id) + ) Engine=InnoDB + ` ) type CompressionDetails struct { @@ -163,11 +164,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...) var mysqlProcs []*exec.Cmd + tabletTypes := map[int]string{ + 0: "primary", + 1: "replica", + 2: "rdonly", + } for i := 0; i < 3; i++ { - tabletType := "replica" - if i == 0 { - tabletType = "primary" - } + tabletType := tabletTypes[i] tablet := localCluster.NewVttabletInstance(tabletType, 0, cell) tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName) tablet.VttabletProcess.DbPassword = dbPassword @@ -220,13 +223,16 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil { return 1, err } + if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil { + return 1, err + } vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory) _, err = vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") if err != nil { return 1, err } - for _, tablet := range []cluster.Vttablet{*primary, *replica1} { + for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} { if err := tablet.VttabletProcess.Setup(); err != nil { return 1, err } @@ -1069,40 +1075,30 @@ func terminateRestore(t *testing.T) { assert.True(t, found, "Restore message not found") } -func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backups []string, destroy func(t *testing.T)) { - restoreWaitForBackup(t, tabletType, nil, true) - verifyInitialReplication(t) - - err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) - - backups = localCluster.VerifyBackupCount(t, shardKsName, 1) - - verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) +func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) { + replica := getReplica(t, replicaIndex) - err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) + err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias) require.Nil(t, err) - err = replica2.VttabletProcess.TearDown() + backups, err = localCluster.ListBackups(shardKsName) require.Nil(t, err) - err = localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", replica2.Alias) - require.Nil(t, err) + verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) - destroy = func(t *testing.T) { - verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) - } - return backups, destroy + return backups } -func GetReplicaPosition(t *testing.T) string { - pos, _ := cluster.GetPrimaryPosition(t, *replica1, hostname) +func GetReplicaPosition(t *testing.T, replicaIndex int) string { + replica := getReplica(t, replicaIndex) + pos, _ := cluster.GetPrimaryPosition(t, *replica, hostname) return pos } -func GetReplicaGtidPurged(t *testing.T) string { +func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string { + replica := getReplica(t, replicaIndex) query := "select @@global.gtid_purged as gtid_purged" - rs, err := replica1.VttabletProcess.QueryTablet(query, keyspaceName, true) + rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) require.NoError(t, err) row := rs.Named().Row() require.NotNil(t, row) @@ -1135,13 +1131,62 @@ func ReadRowsFromPrimary(t *testing.T) (msgs []string) { return ReadRowsFromTablet(t, primary) } -func ReadRowsFromReplica(t *testing.T) (msgs []string) { - return ReadRowsFromTablet(t, replica1) +func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet { + switch replicaIndex { + case 0: + return replica1 + case 1: + return replica2 + default: + assert.Failf(t, "invalid replica index", "index=%d", replicaIndex) + return nil + } +} + +func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) { + return ReadRowsFromTablet(t, getReplica(t, replicaIndex)) +} + +// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` times +func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) { + replica := getReplica(t, replicaIndex) + query := "flush binary logs" + for i := 0; i < count; i++ { + _, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + } +} + +// FlushAndPurgeBinaryLogsOnReplica intentionally loses all existing binary logs. It flushes into a new binary log +// and immediately purges all previous logs. +// This is used to lose information. +func FlushAndPurgeBinaryLogsOnReplica(t *testing.T, replicaIndex int) (lastBinlog string) { + FlushBinaryLogsOnReplica(t, replicaIndex, 1) + + replica := getReplica(t, replicaIndex) + { + query := "show binary logs" + rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + require.NotEmpty(t, rs.Rows) + for _, row := range rs.Rows { + // binlog file name is first column + lastBinlog = row[0].ToString() + } + } + { + query, err := sqlparser.ParseAndBind("purge binary logs to %a", sqltypes.StringBindVariable(lastBinlog)) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + } + return lastBinlog } func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.BackupManifest) { // reading manifest - data, err := os.ReadFile(backupLocation + "/MANIFEST") + fullPath := backupLocation + "/MANIFEST" + data, err := os.ReadFile(fullPath) require.NoErrorf(t, err, "error while reading MANIFEST %v", err) // parsing manifest @@ -1151,19 +1196,19 @@ func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.B return manifest } -func TestReplicaFullBackup(t *testing.T) (manifest *mysqlctl.BackupManifest, destroy func(t *testing.T)) { - backups, destroy := vtctlBackupReplicaNoDestroyNoWrites(t, "replica") +func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.BackupManifest) { + backups := vtctlBackupReplicaNoDestroyNoWrites(t, replicaIndex) backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backups[len(backups)-1] - return readManifestFile(t, backupLocation), destroy + return readManifestFile(t, backupLocation) } -func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { +func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { incrementalFromPosArg := "auto" if !incrementalFromPos.IsZero() { incrementalFromPosArg = replication.EncodePosition(incrementalFromPos) } - output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica1.Alias) + output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica.Alias) if expectError != "" { require.Errorf(t, err, "expected: %v", expectError) require.Contains(t, output, expectError) @@ -1173,23 +1218,43 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos replication.P backups, err := localCluster.ListBackups(shardKsName) require.NoError(t, err) - verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) backupName = backups[len(backups)-1] backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName return readManifestFile(t, backupLocation), backupName } -func TestReplicaRestoreToPos(t *testing.T, restoreToPos replication.Position, expectError string) { +func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { + replica := getReplica(t, replicaIndex) + return testReplicaIncrementalBackup(t, replica, incrementalFromPos, expectError) +} + +func TestReplicaFullRestore(t *testing.T, replicaIndex int, expectError string) { + replica := getReplica(t, replicaIndex) + + output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", replica.Alias) + if expectError != "" { + require.Errorf(t, err, "expected: %v", expectError) + require.Contains(t, output, expectError) + return + } + require.NoErrorf(t, err, "output: %v", output) + verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars()) +} + +func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replication.Position, expectError string) { + replica := getReplica(t, replicaIndex) + require.False(t, restoreToPos.IsZero()) restoreToPosArg := replication.EncodePosition(restoreToPos) - output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--restore-to-pos", restoreToPosArg, replica1.Alias) + output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_pos", restoreToPosArg, replica.Alias) if expectError != "" { require.Errorf(t, err, "expected: %v", expectError) require.Contains(t, output, expectError) return } require.NoErrorf(t, err, "output: %v", output) - verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars()) + verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars()) } func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) { diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index a109440b53b..ddeb43c7dd7 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -33,10 +33,20 @@ import ( ) var ( - minimalSleepDuration = time.Second + 100*time.Millisecond gracefulPostBackupDuration = 10 * time.Millisecond ) +const ( + postWriteSleepDuration = 2 * time.Second // Nice for debugging purposes: clearly distinguishes the timestamps of certain operations, and as results the names/timestamps of backups. +) + +const ( + operationFullBackup = iota + operationIncrementalBackup + operationRestore + operationFlushAndPurge +) + type PITRTestCase struct { Name string SetupType int @@ -48,12 +58,12 @@ type testedBackupTimestampInfo struct { postTimestamp time.Time } -func waitForReplica(t *testing.T) { +func waitForReplica(t *testing.T, replicaIndex int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() pMsgs := ReadRowsFromPrimary(t) for { - rMsgs := ReadRowsFromReplica(t) + rMsgs := ReadRowsFromReplica(t, replicaIndex) if len(pMsgs) == len(rMsgs) { // success return @@ -68,7 +78,9 @@ func waitForReplica(t *testing.T) { } } -// ExecTestIncrementalBackupAndRestoreToPos +// ExecTestIncrementalBackupAndRestoreToPos runs a series of backups: a full backup and multiple incremental backups. +// in between, it makes writes to the database, and takes notes: what data was available in what backup. +// It then restores each and every one of those backups, in random order, and expects to find the specific data associated with the backup. func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) { defer cluster.PanicHandler(t) @@ -84,8 +96,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) backupPositions := []string{} recordRowsPerPosition := func(t *testing.T) { - pos := GetReplicaPosition(t) - msgs := ReadRowsFromReplica(t) + pos := GetReplicaPosition(t, 0) + msgs := ReadRowsFromReplica(t, 0) if _, ok := rowsPerPosition[pos]; !ok { backupPositions = append(backupPositions, pos) rowsPerPosition[pos] = len(msgs) @@ -95,13 +107,13 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) var fullBackupPos replication.Position t.Run("full backup", func(t *testing.T) { InsertRowOnPrimary(t, "before-full-backup") - waitForReplica(t) + waitForReplica(t, 0) - manifest, _ := TestReplicaFullBackup(t) + manifest := TestReplicaFullBackup(t, 0) fullBackupPos = manifest.Position require.False(t, fullBackupPos.IsZero()) // - msgs := ReadRowsFromReplica(t) + msgs := ReadRowsFromReplica(t, 0) pos := replication.EncodePosition(fullBackupPos) backupPositions = append(backupPositions, pos) rowsPerPosition[pos] = len(msgs) @@ -162,8 +174,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. // Also, we gie the replica a chance to catch up. - time.Sleep(minimalSleepDuration) - waitForReplica(t) + time.Sleep(postWriteSleepDuration) + // randomly flush binary logs 0, 1 or 2 times + FlushBinaryLogsOnReplica(t, 0, rand.Intn(3)) + waitForReplica(t, 0) recordRowsPerPosition(t) // configure --incremental-from-pos to either: // - auto @@ -176,7 +190,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) incrementalFromPos = fullBackupPos } } - manifest, backupName := TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError) + // always use same 1st replica + manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPos, tc.expectError) if tc.expectError != "" { return } @@ -190,11 +205,11 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) require.NotEqual(t, manifest.Position, manifest.FromPosition) require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet)) - gtidPurgedPos, err := replication.ParsePosition(replication.Mysql56FlavorID, GetReplicaGtidPurged(t)) + gtidPurgedPos, err := replication.ParsePosition(replication.Mysql56FlavorID, GetReplicaGtidPurged(t, 0)) require.NoError(t, err) fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) - expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) + expectFromPosition := lastBackupPos.GTIDSet if !incrementalFromPos.IsZero() { expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) } @@ -209,8 +224,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) t.Run(testName, func(t *testing.T) { restoreToPos, err := replication.DecodePosition(pos) require.NoError(t, err) - TestReplicaRestoreToPos(t, restoreToPos, "") - msgs := ReadRowsFromReplica(t) + TestReplicaRestoreToPos(t, 0, restoreToPos, "") + msgs := ReadRowsFromReplica(t, 0) count, ok := rowsPerPosition[pos] require.True(t, ok) assert.Equalf(t, count, len(msgs), "messages: %v", msgs) @@ -257,13 +272,13 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes var fullBackupPos replication.Position t.Run("full backup", func(t *testing.T) { insertRowOnPrimary(t, "before-full-backup") - waitForReplica(t) + waitForReplica(t, 0) - manifest, _ := TestReplicaFullBackup(t) + manifest := TestReplicaFullBackup(t, 0) fullBackupPos = manifest.Position require.False(t, fullBackupPos.IsZero()) // - rows := ReadRowsFromReplica(t) + rows := ReadRowsFromReplica(t, 0) testedBackups = append(testedBackups, testedBackupTimestampInfo{len(rows), time.Now()}) }) @@ -322,9 +337,9 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. // Also, we gie the replica a chance to catch up. - time.Sleep(minimalSleepDuration) - waitForReplica(t) - rowsBeforeBackup := ReadRowsFromReplica(t) + time.Sleep(postWriteSleepDuration) + waitForReplica(t, 0) + rowsBeforeBackup := ReadRowsFromReplica(t, 0) // configure --incremental-from-pos to either: // - auto // - explicit last backup pos @@ -336,7 +351,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes incrementalFromPos = fullBackupPos } } - manifest, backupName := TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError) + manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPos, tc.expectError) if tc.expectError != "" { return } @@ -372,7 +387,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes } } - gtidPurgedPos, err := replication.ParsePosition(replication.Mysql56FlavorID, GetReplicaGtidPurged(t)) + gtidPurgedPos, err := replication.ParsePosition(replication.Mysql56FlavorID, GetReplicaGtidPurged(t, 0)) require.NoError(t, err) fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) @@ -399,7 +414,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes } TestReplicaRestoreToTimestamp(t, testedBackup.postTimestamp, expectError) if expectError == "" { - msgs := ReadRowsFromReplica(t) + msgs := ReadRowsFromReplica(t, 0) assert.Equalf(t, testedBackup.rows, len(msgs), "messages: %v", msgs) numSuccessfulRestores++ } else { @@ -425,3 +440,186 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes }) }) } + +// ExecTestIncrementalBackupOnTwoTablets runs a series of interleaved backups on two different replicas: full and incremental. +// Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on +// one another. +func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { + defer cluster.PanicHandler(t) + + t.Run(tcase.Name, func(t *testing.T) { + // setup cluster for the testing + code, err := LaunchCluster(tcase.SetupType, "xbstream", 0, tcase.ComprssDetails) + require.NoError(t, err, "setup failed with status code %d", code) + defer TearDownCluster() + + InitTestTable(t) + + rowsPerPosition := map[string]int{} + + recordRowsPerPosition := func(t *testing.T, replicaIndex int) { + pos := GetReplicaPosition(t, replicaIndex) + msgs := ReadRowsFromReplica(t, replicaIndex) + if _, ok := rowsPerPosition[pos]; !ok { + rowsPerPosition[pos] = len(msgs) + } + } + + var lastBackupPos replication.Position + InsertRowOnPrimary(t, "before-incremental-backups") + waitForReplica(t, 0) + waitForReplica(t, 1) + + tt := []struct { + name string + operationType int + replicaIndex int + expectError string + }{ + // The following tests run sequentially and build on top of previous results + { + name: "full1", + operationType: operationFullBackup, + }, + { + name: "incremental1", + operationType: operationIncrementalBackup, + }, + { + name: "restore1", + operationType: operationRestore, + }, + { + // Shows you can take an incremental restore when full & incremental backups were only ever executed on a different replica + name: "incremental2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "full2", + operationType: operationFullBackup, + replicaIndex: 1, + }, + { + // This incremental backup will use full2 as the base backup + name: "incremental2-after-full2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "restore2", + operationType: operationRestore, + replicaIndex: 1, + }, + // Begin a series of interleaved incremental backups + { + name: "incremental-replica1", + operationType: operationIncrementalBackup, + }, + { + name: "incremental-replica2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "incremental-replica1", + operationType: operationIncrementalBackup, + }, + { + name: "incremental-replica2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + // Done interleaved backups. + { + // Lose binary log data + name: "flush and purge 1", + operationType: operationFlushAndPurge, + replicaIndex: 0, + }, + { + // Fail to run incremental backup due to lost data + name: "incremental-replica1 failure", + operationType: operationIncrementalBackup, + expectError: "Required entries have been purged", + }, + { + // Lose binary log data + name: "flush and purge 2", + operationType: operationFlushAndPurge, + replicaIndex: 1, + }, + { + // Fail to run incremental backup due to lost data + name: "incremental-replica2 failure", + operationType: operationIncrementalBackup, + replicaIndex: 1, + expectError: "Required entries have been purged", + }, + { + // Since we've lost binlog data, incremental backups are no longer possible. The situation can be salvaged by running a full backup + name: "full1 after purge", + operationType: operationFullBackup, + }, + { + // Show that replica2 incremental backup is able to work based on the above full backup + name: "incremental-replica2 after purge and backup", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + } + insertRowAndWait := func(t *testing.T, replicaIndex int, data string) { + t.Run("insert row and wait", func(t *testing.T) { + InsertRowOnPrimary(t, data) + time.Sleep(postWriteSleepDuration) + waitForReplica(t, replicaIndex) + recordRowsPerPosition(t, replicaIndex) + }) + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + insertRowAndWait(t, tc.replicaIndex, tc.name) + t.Run("running operation", func(t *testing.T) { + switch tc.operationType { + case operationFlushAndPurge: + FlushAndPurgeBinaryLogsOnReplica(t, tc.replicaIndex) + case operationFullBackup: + manifest := TestReplicaFullBackup(t, tc.replicaIndex) + fullBackupPos := manifest.Position + require.False(t, fullBackupPos.IsZero()) + // + msgs := ReadRowsFromReplica(t, tc.replicaIndex) + pos := replication.EncodePosition(fullBackupPos) + rowsPerPosition[pos] = len(msgs) + + lastBackupPos = fullBackupPos + case operationIncrementalBackup: + var incrementalFromPos replication.Position // keep zero, we will use "auto" + manifest, _ := TestReplicaIncrementalBackup(t, tc.replicaIndex, incrementalFromPos, tc.expectError) + if tc.expectError != "" { + return + } + defer func() { + lastBackupPos = manifest.Position + }() + require.False(t, manifest.FromPosition.IsZero()) + require.NotEqual(t, manifest.Position, manifest.FromPosition) + require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet)) + + gtidPurgedPos, err := replication.ParsePosition(replication.Mysql56FlavorID, GetReplicaGtidPurged(t, tc.replicaIndex)) + require.NoError(t, err) + fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) + + require.True(t, lastBackupPos.GTIDSet.Contains(fromPositionIncludingPurged), "expected: %v to contain %v", lastBackupPos.GTIDSet, fromPositionIncludingPurged) + case operationRestore: + TestReplicaFullRestore(t, tc.replicaIndex, "") + // should return into replication stream + insertRowAndWait(t, tc.replicaIndex, "post-restore-check") + default: + require.FailNowf(t, "unknown operation type", "operation: %d", tc.operationType) + } + }) + }) + } + }) +} diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index f0255f82b49..3bf1560f8b3 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -279,6 +279,9 @@ type BackupManifest struct { // which incremental changes are backed up. FromPosition replication.Position + // FromBackup indicates the backup name on which this incremental backup is based, assumign this is an incremental backup with "auto" pos`` + FromBackup string + // Incremental indicates whether this is an incremental backup Incremental bool @@ -401,9 +404,16 @@ func (p *RestorePath) String() string { // FindLatestSuccessfulBackup returns the handle and manifest for the last good backup, // which can be either full or increment -func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs []backupstorage.BackupHandle) (backupstorage.BackupHandle, *BackupManifest, error) { +func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs []backupstorage.BackupHandle, excludeBackupName string) (backupstorage.BackupHandle, *BackupManifest, error) { for index := len(bhs) - 1; index >= 0; index-- { bh := bhs[index] + if bh.Name() == excludeBackupName { + // skip this bh. Use case: in an incremental backup, as we look for previous successful backups, + // the new incremental backup handle is partial: the directory exists, it will show in ListBackups, but + // the MANIFEST file does nto exist yet. So we avoid the errors/warnings associated with reading this partial backup, + // and just skip it. + continue + } // Check that the backup MANIFEST exists and can be successfully decoded. bm, err := GetBackupManifest(ctx, bh) if err != nil { @@ -415,6 +425,29 @@ func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs return nil, nil, ErrNoCompleteBackup } +// FindLatestSuccessfulBackupPosition returns the position of the last known successful backup +func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams, excludeBackupName string) (backupName string, pos replication.Position, err error) { + bs, err := backupstorage.GetBackupStorage() + if err != nil { + return "", pos, err + } + defer bs.Close() + + // Backups are stored in a directory structure that starts with + // / + backupDir := GetBackupDir(params.Keyspace, params.Shard) + bhs, err := bs.ListBackups(ctx, backupDir) + if err != nil { + return "", pos, vterrors.Wrap(err, "ListBackups failed") + } + bh, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs, excludeBackupName) + if err != nil { + return "", pos, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") + } + pos = manifest.Position + return bh.Name(), pos, nil +} + // FindBackupToRestore returns a path, a sequence of backup handles, to be restored. // The returned handles stand for valid backups with complete manifests. func FindBackupToRestore(ctx context.Context, params RestoreParams, bhs []backupstorage.BackupHandle) (restorePath *RestorePath, err error) { diff --git a/go/vt/mysqlctl/binlogs_gtid.go b/go/vt/mysqlctl/binlogs_gtid.go index 20fc3dcd32e..3ea48663578 100644 --- a/go/vt/mysqlctl/binlogs_gtid.go +++ b/go/vt/mysqlctl/binlogs_gtid.go @@ -94,13 +94,21 @@ func ChooseBinlogsForIncrementalBackup( // know this when we look into the _next_ binlog file's Previous-GTIDs. continue } + // Got here? This means backupFromGTIDSet does not full contain the current binlog's Previous-GTIDs. + // In other words, Previoud-GTIDs have entries on top of backupFromGTIDSet. Which suggests that these + // entries were added by the previous binary log. if i == 0 { + // Ummm... there _is no_ previous binary log. return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "Required entries have been purged. Oldest binary log %v expects entries not found in backup pos. Expected pos=%v", binlog, previousGTIDsPos) } - if !prevGTIDsUnion.Union(purgedGTIDSet).Contains(backupFromGTIDSet) { + // The other thing to validate, is that we can't allow a situation where the backup-GTIDs have entries not covered + // by our binary log's Previous-GTIDs (padded with purged GTIDs). Because that means we can't possibly restore to + // such position. + prevGTIDsUnionPurged := prevGTIDsUnion.Union(purgedGTIDSet) + if !prevGTIDsUnionPurged.Contains(backupFromGTIDSet) { return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, - "Mismatching GTID entries. Requested backup pos has entries not found in the binary logs, and binary logs have entries not found in the requested backup pos. Neither fully contains the other. Requested pos=%v, binlog pos=%v", - backupFromGTIDSet, previousGTIDsPos.GTIDSet) + "Mismatching GTID entries. Requested backup pos has entries not found in the binary logs, and binary logs have entries not found in the requested backup pos. Neither fully contains the other.\n- Requested pos=%v\n- binlog pos=%v\n- purgedGTIDSet=%v\n- union=%v\n- union purged=%v", + backupFromGTIDSet, previousGTIDsPos.GTIDSet, purgedGTIDSet, prevGTIDsUnion, prevGTIDsUnionPurged) } // We begin with the previous binary log, and we ignore the last binary log, because it's still open and being written to. binaryLogsToBackup = binaryLogs[i-1 : len(binaryLogs)-1] diff --git a/go/vt/mysqlctl/binlogs_gtid_test.go b/go/vt/mysqlctl/binlogs_gtid_test.go index 0d57928c49e..655208e908e 100644 --- a/go/vt/mysqlctl/binlogs_gtid_test.go +++ b/go/vt/mysqlctl/binlogs_gtid_test.go @@ -112,6 +112,46 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) { backupPos: "16b1039f-0000-0000-0000-000000000000:1-63", expectError: "Mismatching GTID entries", }, + { + name: "empty previous GTIDs in first binlog with gap, with good backup pos", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"}, + }, + { + name: "empty previous GTIDs in first binlog with gap, and without gtid_purged", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + expectError: "Mismatching GTID entries", + }, + { + name: "empty previous GTIDs in first binlog but with proper gtid_purged", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-40", + expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"}, + }, { name: "empty previous GTIDs in first binlog covering backup pos", previousGTIDs: map[string]string{ diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index e8aac724442..8442c855c0a 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -244,6 +244,18 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return false, vterrors.Wrap(err, "can't get MySQL version") } + var fromBackupName string + if params.IncrementalFromPos == autoIncrementalFromPos { + params.Logger.Infof("auto evaluating incremental_from_pos") + backupName, pos, err := FindLatestSuccessfulBackupPosition(ctx, params, bh.Name()) + if err != nil { + return false, err + } + fromBackupName = backupName + params.IncrementalFromPos = replication.EncodePosition(pos) + params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos) + } + // @@gtid_purged getPurgedGTIDSet := func() (replication.Position, replication.Mysql56GTIDSet, error) { gtidPurged, err := params.Mysqld.GetGTIDPurged(ctx) @@ -256,33 +268,6 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par } return gtidPurged, purgedGTIDSet, nil } - gtidPurged, purgedGTIDSet, err := getPurgedGTIDSet() - if err != nil { - return false, err - } - - if params.IncrementalFromPos == autoIncrementalFromPos { - params.Logger.Infof("auto evaluating incremental_from_pos") - bs, err := backupstorage.GetBackupStorage() - if err != nil { - return false, err - } - defer bs.Close() - - // Backups are stored in a directory structure that starts with - // / - backupDir := GetBackupDir(params.Keyspace, params.Shard) - bhs, err := bs.ListBackups(ctx, backupDir) - if err != nil { - return false, vterrors.Wrap(err, "ListBackups failed") - } - _, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs) - if err != nil { - return false, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") - } - params.IncrementalFromPos = replication.EncodePosition(manifest.Position) - params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos) - } // params.IncrementalFromPos is a string. We want to turn that into a MySQL GTID backupFromGTIDSet, err := getIncrementalFromPosGTIDSet(params.IncrementalFromPos) @@ -305,6 +290,13 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par if err != nil { return false, vterrors.Wrapf(err, "cannot get binary logs in incremental backup") } + // gtid_purged is important information. The restore flow uses this info to to complement binary logs' Previous-GTIDs. + // It is important to only get gtid_purged _after_ we've rotated into the new binary log, because the `FLUSH BINARY LOGS` + // command may also purge old logs, hence affecting the value of gtid_purged. + gtidPurged, purgedGTIDSet, err := getPurgedGTIDSet() + if err != nil { + return false, err + } previousGTIDs := map[string]string{} getBinlogPreviousGTIDs := func(ctx context.Context, binlog string) (gtids string, err error) { gtids, ok := previousGTIDs[binlog] @@ -331,6 +323,14 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par if err != nil { return false, vterrors.Wrapf(err, "cannot parse position %v", incrementalBackupToGTID) } + // The backup position is the GTISset of the last binary log (taken from Previous-GTIDs of the one-next binary log), and we + // also include gtid_purged ; this complies with the "standard" way MySQL "thinks" about GTIDs: there's gtid_executed, which includes + // everything that's ever been applied, and a subset of that is gtid_purged, which are the event no longer available in binary logs. + // When we consider Vitess incremental backups, what's important for us is "what's the GTIDSet that's true when this backup was taken, + // and which will be true when we restore this backup". The answer to this is the GTIDSet that includes the purged GTIDs. + // It's also nice for icnremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged + // on _this_ tablet. They don't care, all they want to know is "what GTIDSet can we get from this". + incrementalBackupToPosition.GTIDSet = incrementalBackupToPosition.GTIDSet.Union(gtidPurged.GTIDSet) req := &mysqlctl.ReadBinlogFilesTimestampsRequest{} for _, binlogFile := range binaryLogsToBackup { fe := FileEntry{Base: backupBinlogDir, Name: binlogFile} @@ -362,7 +362,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par // incrementalBackupFromGTID is the "previous GTIDs" of the first binlog file we back up. // It is a fact that incrementalBackupFromGTID is earlier or equal to params.IncrementalFromPos. // In the backup manifest file, we document incrementalBackupFromGTID, not the user's requested position. - if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, gtidPurged, incrementalBackupFromPosition, binaryLogsToBackup, serverUUID, mysqlVersion, incrDetails); err != nil { + if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, gtidPurged, incrementalBackupFromPosition, fromBackupName, binaryLogsToBackup, serverUUID, mysqlVersion, incrDetails); err != nil { return false, err } return true, nil @@ -473,7 +473,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac } // Backup everything, capture the error. - backupErr := be.backupFiles(ctx, params, bh, replicationPosition, gtidPurgedPosition, replication.Position{}, nil, serverUUID, mysqlVersion, nil) + backupErr := be.backupFiles(ctx, params, bh, replicationPosition, gtidPurgedPosition, replication.Position{}, "", nil, serverUUID, mysqlVersion, nil) usable := backupErr == nil // Try to restart mysqld, use background context in case we timed out the original context @@ -553,9 +553,10 @@ func (be *BuiltinBackupEngine) backupFiles( ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, - replicationPosition replication.Position, + backupPosition replication.Position, purgedPosition replication.Position, fromPosition replication.Position, + fromBackupName string, binlogFiles []string, serverUUID string, mysqlVersion string, @@ -634,7 +635,8 @@ func (be *BuiltinBackupEngine) backupFiles( return vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) } defer func() { - if closeErr := wc.Close(); finalErr == nil { + closeErr := wc.Close() + if finalErr == nil { finalErr = closeErr } }() @@ -644,9 +646,10 @@ func (be *BuiltinBackupEngine) backupFiles( // Common base fields BackupManifest: BackupManifest{ BackupMethod: builtinBackupEngineName, - Position: replicationPosition, + Position: backupPosition, PurgedPosition: purgedPosition, FromPosition: fromPosition, + FromBackup: fromBackupName, Incremental: !fromPosition.IsZero(), ServerUUID: serverUUID, TabletAlias: params.TabletAlias, diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 1f8c03a4d92..cb360f50e7e 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -1362,6 +1362,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc } scanner := bufio.NewScanner(pipe) scanComplete := make(chan error) + intentionalKill := false scan := func() { defer close(scanComplete) // Read line by line and process it @@ -1378,6 +1379,10 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc matchFound = true } if found && stopAtFirst { + // Found the first timestamp and it's all we need. We won't scan any further and so we should also + // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). + intentionalKill = true + mysqlbinlogCmd.Process.Kill() return } } @@ -1386,7 +1391,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc return matchedTime, false, err } go scan() - if err := mysqlbinlogCmd.Wait(); err != nil { + if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") } if err := <-scanComplete; err != nil {