Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
--restore_from_backup (init restore parameter) will check BackupStorage for a recent backup at startup and start there
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
--restore_from_backup (init restore parameter) will check BackupStorage for a recent backup at startup and start there
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
Expand Down
20 changes: 17 additions & 3 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ import (
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)

// TestIncrementalBackupAndRestoreToPos
// TestIncrementalBackupAndRestoreToPos - tests incremental backups and restores.
// The general outline of the test:
// - Generate some schema with data
// - Take a full backup
// - Proceed to take a series of inremental backups. In between, inject data (insert rows), and keep record
// of which data (number of rows) is present in each backup, and at which position.
// - Expect backups success/failure per scenario
// - Next up, we start testing restores. Randomly pick recorded positions and restore to those points in time.
// - In each restore, excpect to find the data (number of rows) recorded for said position
// - Some restores should fail because the position exceeds the last binlog
// - Do so for all recorded positions.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
// - Last, create a new tablet with --restore_from_backup --restore-to-pos and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand All @@ -45,8 +59,8 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
// - Do so for all recorded tiemstamps.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
//
// All of the above is done for BuiltinBackup, XtraBackup, Mysqlctld (which is technically builtin)
// - Last, create a new tablet with --restore_from_backup --restore-to-timestamp and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand Down
39 changes: 31 additions & 8 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
primary *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
replica3 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
primary key (id)
) Engine=InnoDB
`
SetupReplica3Tablet func(extraArgs []string) (*cluster.Vttablet, error)
)

type CompressionDetails struct {
Expand Down Expand Up @@ -170,9 +172,10 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
0: "primary",
1: "replica",
2: "rdonly",
3: "spare",
}
for i := 0; i < 3; i++ {
tabletType := tabletTypes[i]

createTablet := func(tabletType string) error {
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
Expand All @@ -182,33 +185,40 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
if setupType == Mysqlctld {
mysqlctldProcess, err := cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctldProcess = *mysqlctldProcess
tablet.MysqlctldProcess.InitDBFile = newInitDBFile
tablet.MysqlctldProcess.ExtraArgs = extraArgs
tablet.MysqlctldProcess.Password = tablet.VttabletProcess.DbPassword
if err := tablet.MysqlctldProcess.Start(); err != nil {
return 1, err
return err
}
shard.Vttablets = append(shard.Vttablets, tablet)
continue
return nil
}

mysqlctlProcess, err := cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctlProcess = *mysqlctlProcess
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
return err
}
mysqlProcs = append(mysqlProcs, proc)

shard.Vttablets = append(shard.Vttablets, tablet)
return nil
}
for i := 0; i < 4; i++ {
tabletType := tabletTypes[i]
if err := createTablet(tabletType); err != nil {
return 1, err
}
}
for _, proc := range mysqlProcs {
if err := proc.Wait(); err != nil {
Expand All @@ -218,6 +228,7 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
primary = shard.Vttablets[0]
replica1 = shard.Vttablets[1]
replica2 = shard.Vttablets[2]
replica3 = shard.Vttablets[3]

if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
Expand All @@ -234,12 +245,20 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
return 1, err
}

for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} {
for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} { // we don't start replica3 yet
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
}

SetupReplica3Tablet = func(extraArgs []string) (*cluster.Vttablet, error) {
replica3.VttabletProcess.ExtraArgs = append(replica3.VttabletProcess.ExtraArgs, extraArgs...)
if err := replica3.VttabletProcess.Setup(); err != nil {
return replica3, err
}
return replica3, nil
}

if err := localCluster.VtctlclientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID); err != nil {
return 1, err
}
Expand Down Expand Up @@ -1140,6 +1159,8 @@ func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet {
return replica1
case 1:
return replica2
case 2:
return replica3
default:
assert.Failf(t, "invalid replica index", "index=%d", replicaIndex)
return nil
Expand Down Expand Up @@ -1290,6 +1311,7 @@ func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replic
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
Expand All @@ -1303,6 +1325,7 @@ func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, e
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func verifyTabletBackupStats(t *testing.T, vars map[string]any) {
Expand Down
53 changes: 53 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

var (
gracefulPostBackupDuration = 10 * time.Millisecond
backupTimeoutDuration = 3 * time.Minute
)

const (
Expand Down Expand Up @@ -225,6 +226,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
})
}

sampleTestedBackupPos := ""
testRestores := func(t *testing.T) {
for _, r := range rand.Perm(len(backupPositions)) {
pos := backupPositions[r]
Expand All @@ -237,6 +239,9 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
count, ok := rowsPerPosition[pos]
require.True(t, ok)
assert.Equalf(t, count, len(msgs), "messages: %v", msgs)
if sampleTestedBackupPos == "" {
sampleTestedBackupPos = pos
}
})
}
}
Expand All @@ -252,6 +257,27 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore-to-pos and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.NotEmpty(t, sampleTestedBackupPos)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup pos %s", sampleTestedBackupPos), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore-to-pos", sampleTestedBackupPos})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", rowsPerPosition[sampleTestedBackupPos]), func(t *testing.T) {
require.NotZero(t, rowsPerPosition[sampleTestedBackupPos])
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, rowsPerPosition[sampleTestedBackupPos], len(msgs))
})
})
})
}

Expand Down Expand Up @@ -415,6 +441,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
})
}

sampleTestedBackupIndex := -1
testRestores := func(t *testing.T) {
numFailedRestores := 0
numSuccessfulRestores := 0
Expand All @@ -433,6 +460,9 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
msgs := ReadRowsFromReplica(t, 0)
assert.Equalf(t, testedBackup.rows, len(msgs), "messages: %v", msgs)
numSuccessfulRestores++
if sampleTestedBackupIndex < 0 {
sampleTestedBackupIndex = backupIndex
}
} else {
numFailedRestores++
}
Expand All @@ -454,6 +484,29 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore-to-timestamp and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.GreaterOrEqual(t, sampleTestedBackupIndex, 0)
sampleTestedBackup := testedBackups[sampleTestedBackupIndex]
restoreToTimestampArg := mysqlctl.FormatRFC3339(sampleTestedBackup.postTimestamp)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup num %d", sampleTestedBackupIndex), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore-to-timestamp", restoreToTimestampArg})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", sampleTestedBackup.rows), func(t *testing.T) {
require.NotZero(t, sampleTestedBackup.rows)
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, sampleTestedBackup.rows, len(msgs))
})
})
})
}

Expand Down
38 changes: 31 additions & 7 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ func registerRestoreFlags(fs *pflag.FlagSet) {
}

var (
// Flags for PITR
// Flags for incremental restore (PITR) - new iteration
restoreToTimestampStr string
restoreToPos string
)

func registerIncrementalRestoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&restoreToTimestampStr, "restore-to-timestamp", restoreToTimestampStr, "(init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'")
fs.StringVar(&restoreToPos, "restore-to-pos", restoreToPos, "(init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups")
}

var (
// Flags for PITR - old iteration
binlogHost string
binlogPort int
binlogUser string
Expand Down Expand Up @@ -99,6 +110,9 @@ func init() {
servenv.OnParseFor("vtcombo", registerRestoreFlags)
servenv.OnParseFor("vttablet", registerRestoreFlags)

servenv.OnParseFor("vtcombo", registerIncrementalRestoreFlags)
servenv.OnParseFor("vttablet", registerIncrementalRestoreFlags)

servenv.OnParseFor("vtcombo", registerPointInTimeRestoreFlags)
servenv.OnParseFor("vttablet", registerPointInTimeRestoreFlags)

Expand All @@ -110,7 +124,14 @@ func init() {
// It will either work, fail gracefully, or return
// an error in case of a non-recoverable error.
// It takes the action lock so no RPC interferes.
func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool, backupTime time.Time) error {
func (tm *TabletManager) RestoreData(
ctx context.Context,
logger logutil.Logger,
waitForBackupInterval time.Duration,
deleteBeforeRestore bool,
backupTime time.Time,
restoreToTimetamp time.Time,
restoreToPos string) error {
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +176,9 @@ func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger,
startTime = time.Now()

req := &tabletmanagerdatapb.RestoreFromBackupRequest{
BackupTime: protoutil.TimeToProto(backupTime),
BackupTime: protoutil.TimeToProto(backupTime),
RestoreToPos: restoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimetamp),
}
err = tm.restoreDataLocked(ctx, logger, waitForBackupInterval, deleteBeforeRestore, req)
if err != nil {
Expand Down Expand Up @@ -207,17 +230,18 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
DryRun: request.DryRun,
Stats: backupstats.RestoreStats(),
}
if request.RestoreToPos != "" && !protoutil.TimeFromProto(request.RestoreToTimestamp).UTC().IsZero() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore_to_pos and --restore_to_timestamp are mutually exclusive")
restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC()
if request.RestoreToPos != "" && !restoreToTimestamp.IsZero() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore-to-pos and --restore-to-timestamp are mutually exclusive")
}
if request.RestoreToPos != "" {
pos, err := replication.DecodePosition(request.RestoreToPos)
if err != nil {
return vterrors.Wrapf(err, "restore failed: unable to decode --restore_to_pos: %s", request.RestoreToPos)
return vterrors.Wrapf(err, "restore failed: unable to decode --restore-to-pos: %s", request.RestoreToPos)
}
params.RestoreToPos = pos
}
if restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC(); !restoreToTimestamp.IsZero() {
if !restoreToTimestamp.IsZero() {
// Restore to given timestamp
params.RestoreToTimestamp = restoreToTimestamp
}
Expand Down
Loading