diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index 3f9eecb8fd2..0abdab9522a 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -240,6 +240,16 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back dbName = fmt.Sprintf("vt_%s", *initKeyspace) } + backupParams := mysqlctl.BackupParams{ + Cnf: mycnf, + Mysqld: mysqld, + Logger: logutil.NewConsoleLogger(), + Concurrency: *concurrency, + HookExtraEnv: extraEnv, + TopoServer: topoServer, + Keyspace: *initKeyspace, + Shard: *initShard, + } // In initial_backup mode, just take a backup of this empty database. if *initialBackup { // Take a backup of this empty DB without restoring anything. @@ -257,7 +267,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } // Now we're ready to take the backup. name := backupName(time.Now(), tabletAlias) - if err := mysqlctl.Backup(ctx, mycnf, mysqld, logutil.NewConsoleLogger(), backupDir, name, *concurrency, extraEnv); err != nil { + if err := mysqlctl.Backup(ctx, backupDir, name, backupParams); err != nil { return fmt.Errorf("backup failed: %v", err) } log.Info("Initial backup successful.") @@ -265,7 +275,18 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } log.Infof("Restoring latest backup from directory %v", backupDir) - restorePos, err := mysqlctl.Restore(ctx, mycnf, mysqld, backupDir, *concurrency, extraEnv, map[string]string{}, logutil.NewConsoleLogger(), true, dbName) + params := mysqlctl.RestoreParams{ + Cnf: mycnf, + Mysqld: mysqld, + Logger: logutil.NewConsoleLogger(), + Concurrency: *concurrency, + HookExtraEnv: extraEnv, + LocalMetadata: map[string]string{}, + DeleteBeforeRestore: true, + DbName: dbName, + Dir: backupDir, + } + restorePos, err := mysqlctl.Restore(ctx, params) switch err { case nil: log.Infof("Successfully restored from backup at replication position %v", restorePos) @@ -360,7 +381,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back // Now we can take a new backup. name := backupName(backupTime, tabletAlias) - if err := mysqlctl.Backup(ctx, mycnf, mysqld, logutil.NewConsoleLogger(), backupDir, name, *concurrency, extraEnv); err != nil { + if err := mysqlctl.Backup(ctx, backupDir, name, backupParams); err != nil { return fmt.Errorf("error taking backup: %v", err) } diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 12a7651146c..324f61c3444 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -90,7 +89,7 @@ var ( // - uses the BackupStorage service to store a new backup // - shuts down Mysqld during the backup // - remember if we were replicating, restore the exact same state -func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, dir, name string, backupConcurrency int, hookExtraEnv map[string]string) error { +func Backup(ctx context.Context, dir, name string, params BackupParams) error { // Start the backup with the BackupStorage. bs, err := backupstorage.GetBackupStorage() if err != nil { @@ -108,7 +107,8 @@ func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil. } // Take the backup, and either AbortBackup or EndBackup. - usable, err := be.ExecuteBackup(ctx, cnf, mysqld, logger, bh, backupConcurrency, hookExtraEnv) + usable, err := be.ExecuteBackup(ctx, params, bh) + logger := params.Logger var finishErr error if usable { finishErr = bh.EndBackup(ctx) @@ -215,17 +215,16 @@ func removeExistingFiles(cnf *Mycnf) error { // Restore is the main entry point for backup restore. If there is no // appropriate backup on the BackupStorage, Restore logs an error // and returns ErrNoBackup. Any other error is returned. -func Restore( - ctx context.Context, - cnf *Mycnf, - mysqld MysqlDaemon, - dir string, - restoreConcurrency int, - hookExtraEnv map[string]string, - localMetadata map[string]string, - logger logutil.Logger, - deleteBeforeRestore bool, - dbName string) (mysql.Position, error) { +func Restore(ctx context.Context, params RestoreParams) (mysql.Position, error) { + + // extract params + cnf := params.Cnf + mysqld := params.Mysqld + logger := params.Logger + localMetadata := params.LocalMetadata + deleteBeforeRestore := params.DeleteBeforeRestore + dbName := params.DbName + dir := params.Dir rval := mysql.Position{} @@ -295,7 +294,8 @@ func Restore( if err != nil { return mysql.Position{}, vterrors.Wrap(err, "Failed to find restore engine") } - if rval, err = re.ExecuteRestore(ctx, cnf, mysqld, logger, dir, bh, restoreConcurrency, hookExtraEnv); err != nil { + + if rval, err = re.ExecuteRestore(ctx, params, bh); err != nil { return rval, err } diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index 50f07739dab..e65ecd97c55 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" ) @@ -39,13 +40,51 @@ var ( // BackupEngine is the interface to take a backup with a given engine. type BackupEngine interface { - ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (bool, error) + ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) ShouldDrainForBackup() bool } +// BackupParams is the struct that holds all params passed to ExecuteBackup +type BackupParams struct { + Cnf *Mycnf + Mysqld MysqlDaemon + Logger logutil.Logger + // Concurrency is the value of -concurrency flag given to Backup command + // It determines how many files are processed in parallel + Concurrency int + // Extra env variables for pre-backup and post-backup transform hooks + HookExtraEnv map[string]string + // TopoServer, Keyspace and Shard are used to discover master tablet + TopoServer *topo.Server + Keyspace string + Shard string +} + +// RestoreParams is the struct that holds all params passed to ExecuteRestore +type RestoreParams struct { + Cnf *Mycnf + Mysqld MysqlDaemon + Logger logutil.Logger + // Concurrency is the value of -restore_concurrency flag (init restore parameter) + // It determines how many files are processed in parallel + Concurrency int + // Extra env variables for pre-restore and post-restore transform hooks + HookExtraEnv map[string]string + // Metadata to write into database after restore. See PopulateMetadataTables + LocalMetadata map[string]string + // DeleteBeforeRestore tells us whether existing data should be deleted before + // restoring. This is always set to false when starting a tablet with -restore_from_backup, + // but is set to true when executing a RestoreFromBackup command on an already running vttablet + DeleteBeforeRestore bool + // Name of the managed database / schema + DbName string + // Directory location to search for a usable backup + Dir string +} + // RestoreEngine is the interface to restore a backup with a given engine. type RestoreEngine interface { - ExecuteRestore(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, dir string, bh backupstorage.BackupHandle, restoreConcurrency int, hookExtraEnv map[string]string) (mysql.Position, error) + ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error) } // BackupRestoreEngine is a combination of BackupEngine and RestoreEngine. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index eba829a16bc..e975c04bdfa 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -38,7 +38,10 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) const ( @@ -236,7 +239,17 @@ func findFilesToBackup(cnf *Mycnf) ([]FileEntry, error) { // ExecuteBackup returns a boolean that indicates if the backup is usable, // and an overall error. -func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (bool, error) { +func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) { + + // extract all params from BackupParams + cnf := params.Cnf + mysqld := params.Mysqld + logger := params.Logger + backupConcurrency := params.Concurrency + hookExtraEnv := params.HookExtraEnv + topoServer := params.TopoServer + keyspace := params.Keyspace + shard := params.Shard logger.Infof("Hook: %v, Compress: %v", *backupStorageHook, *backupStorageCompress) @@ -307,6 +320,12 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my return usable, vterrors.Wrap(err, "can't restart mysqld") } + // And set read-only mode + logger.Infof("resetting mysqld read-only to %v", readOnly) + if err := mysqld.SetReadOnly(readOnly); err != nil { + return usable, err + } + // Restore original mysqld state that we saved above. if semiSyncMaster || semiSyncSlave { // Only do this if one of them was on, since both being off could mean @@ -328,12 +347,36 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my if err := WaitForSlaveStart(mysqld, slaveStartDeadline); err != nil { return usable, vterrors.Wrap(err, "slave is not restarting") } - } - // And set read-only mode - logger.Infof("resetting mysqld read-only to %v", readOnly) - if err := mysqld.SetReadOnly(readOnly); err != nil { - return usable, err + // Wait for a reliable value for SecondsBehindMaster from SlaveStatus() + + // We know that we stopped at replicationPosition. + // If MasterPosition is the same, that means no writes + // have happened to master, so we are up-to-date. + // Otherwise, we wait for replica's Position to change from + // the saved replicationPosition before proceeding + tmc := tmclient.NewTabletManagerClient() + defer tmc.Close() + remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer remoteCancel() + + masterPos, err := getMasterPosition(remoteCtx, tmc, topoServer, keyspace, shard) + // If we are unable to get master position, return error. + if err != nil { + return usable, err + } + if !replicationPosition.Equal(masterPos) { + for { + status, err := mysqld.SlaveStatus() + if err != nil { + return usable, err + } + newPos := status.Position + if !newPos.Equal(replicationPosition) { + break + } + } + } } return usable, backupErr @@ -513,15 +556,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql // ExecuteRestore restores from a backup. If the restore is successful // we return the position from which replication should start // otherwise an error is returned -func (be *BuiltinBackupEngine) ExecuteRestore( - ctx context.Context, - cnf *Mycnf, - mysqld MysqlDaemon, - logger logutil.Logger, - dir string, - bh backupstorage.BackupHandle, - restoreConcurrency int, - hookExtraEnv map[string]string) (mysql.Position, error) { +func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error) { + + cnf := params.Cnf + mysqld := params.Mysqld + logger := params.Logger + restoreConcurrency := params.Concurrency + hookExtraEnv := params.HookExtraEnv zeroPosition := mysql.Position{} var bm builtinBackupManifest @@ -683,6 +724,29 @@ func (be *BuiltinBackupEngine) ShouldDrainForBackup() bool { return true } +func getMasterPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server, keyspace, shard string) (mysql.Position, error) { + si, err := ts.GetShard(ctx, keyspace, shard) + if err != nil { + return mysql.Position{}, vterrors.Wrap(err, "can't read shard") + } + if topoproto.TabletAliasIsZero(si.MasterAlias) { + return mysql.Position{}, fmt.Errorf("shard %v/%v has no master", keyspace, shard) + } + ti, err := ts.GetTablet(ctx, si.MasterAlias) + if err != nil { + return mysql.Position{}, fmt.Errorf("can't get master tablet record %v: %v", topoproto.TabletAliasString(si.MasterAlias), err) + } + posStr, err := tmc.MasterPosition(ctx, ti.Tablet) + if err != nil { + return mysql.Position{}, fmt.Errorf("can't get master replication position: %v", err) + } + pos, err := mysql.DecodePosition(posStr) + if err != nil { + return mysql.Position{}, fmt.Errorf("can't decode master replication position %q: %v", posStr, err) + } + return pos, nil +} + func init() { BackupRestoreEngineMap["builtin"] = &BuiltinBackupEngine{} } diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 7fdb5911495..a2a035b1bed 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -121,7 +121,12 @@ func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalE // ExecuteBackup returns a boolean that indicates if the backup is usable, // and an overall error. -func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (complete bool, finalErr error) { +func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (complete bool, finalErr error) { + // extract all params from BackupParams + cnf := params.Cnf + mysqld := params.Mysqld + logger := params.Logger + if *xtrabackupUser == "" { return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.") } @@ -362,15 +367,11 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, logger } // ExecuteRestore restores from a backup. Any error is returned. -func (be *XtrabackupEngine) ExecuteRestore( - ctx context.Context, - cnf *Mycnf, - mysqld MysqlDaemon, - logger logutil.Logger, - dir string, - bh backupstorage.BackupHandle, - restoreConcurrency int, - hookExtraEnv map[string]string) (mysql.Position, error) { +func (be *XtrabackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error) { + + cnf := params.Cnf + mysqld := params.Mysqld + logger := params.Logger zeroPosition := mysql.Position{} var bm xtraBackupManifest diff --git a/go/vt/vttablet/tabletmanager/healthcheck_test.go b/go/vt/vttablet/tabletmanager/healthcheck_test.go index 6420f48c632..88b8eec0e7c 100644 --- a/go/vt/vttablet/tabletmanager/healthcheck_test.go +++ b/go/vt/vttablet/tabletmanager/healthcheck_test.go @@ -888,6 +888,114 @@ func TestOldHealthCheck(t *testing.T) { } } +// TestBackupStateChange verifies that after backup we check +// the replication delay before setting REPLICA tablet to SERVING +func TestBackupStateChange(t *testing.T) { + ctx := context.Background() + agent := createTestAgent(ctx, t, nil) + + *degradedThreshold = 7 * time.Second + *unhealthyThreshold = 15 * time.Second + + if _, err := expectBroadcastData(agent.QueryServiceControl, true, "healthcheck not run yet", 0); err != nil { + t.Fatal(err) + } + if err := expectStateChange(agent.QueryServiceControl, true, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + + agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 16 * time.Second + + // change to BACKUP, query service will turn off + if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_BACKUP); err != nil { + t.Fatal(err) + } + if err := agent.RefreshState(ctx); err != nil { + t.Fatal(err) + } + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } + if err := expectStateChange(agent.QueryServiceControl, false, topodatapb.TabletType_BACKUP); err != nil { + t.Fatal(err) + } + // change back to REPLICA, query service should not start + // because replication delay > unhealthyThreshold + if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + if err := agent.RefreshState(ctx); err != nil { + t.Fatal(err) + } + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } + if err := expectStateChange(agent.QueryServiceControl, false, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + + // run healthcheck + // now query service should still be OFF + agent.runHealthCheck() + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } +} + +// TestRestoreStateChange verifies that after restore we check +// the replication delay before setting REPLICA tablet to SERVING +func TestRestoreStateChange(t *testing.T) { + ctx := context.Background() + agent := createTestAgent(ctx, t, nil) + + *degradedThreshold = 7 * time.Second + *unhealthyThreshold = 15 * time.Second + + if _, err := expectBroadcastData(agent.QueryServiceControl, true, "healthcheck not run yet", 0); err != nil { + t.Fatal(err) + } + if err := expectStateChange(agent.QueryServiceControl, true, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + + agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 16 * time.Second + + // change to RESTORE, query service will turn off + if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_RESTORE); err != nil { + t.Fatal(err) + } + if err := agent.RefreshState(ctx); err != nil { + t.Fatal(err) + } + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } + if err := expectStateChange(agent.QueryServiceControl, false, topodatapb.TabletType_RESTORE); err != nil { + t.Fatal(err) + } + // change back to REPLICA, query service should not start + // because replication delay > unhealthyThreshold + if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + if err := agent.RefreshState(ctx); err != nil { + t.Fatal(err) + } + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } + if err := expectStateChange(agent.QueryServiceControl, false, topodatapb.TabletType_REPLICA); err != nil { + t.Fatal(err) + } + + // run healthcheck + // now query service should still be OFF + agent.runHealthCheck() + if agent.QueryServiceControl.IsServing() { + t.Errorf("Query service should NOT be running") + } +} + // expectBroadcastData checks that runHealthCheck() broadcasted the expected // stats (going the value for secondsBehindMaster). func expectBroadcastData(qsc tabletserver.Controller, serving bool, healthError string, secondsBehindMaster uint32) (*tabletservermock.BroadcastData, error) { @@ -925,7 +1033,7 @@ func expectStateChange(qsc tabletserver.Controller, serving bool, tabletType top } got := <-qsc.(*tabletservermock.Controller).StateChanges if !reflect.DeepEqual(got, want) { - return fmt.Errorf("unexpected state change. got: %v want: %v got", got, want) + return fmt.Errorf("unexpected state change. got: %v want: %v", got, want) } return nil } diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index d01a3ad4fd6..6cd629a50ce 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -21,7 +21,9 @@ import ( "fmt" "time" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" "golang.org/x/net/context" "vitess.io/vitess/go/vt/log" @@ -83,11 +85,23 @@ func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil. tablet := agent.Tablet() dir := fmt.Sprintf("%v/%v", tablet.Keyspace, tablet.Shard) + params := mysqlctl.RestoreParams{ + Cnf: agent.Cnf, + Mysqld: agent.MysqlDaemon, + Logger: logger, + Concurrency: *restoreConcurrency, + HookExtraEnv: agent.hookExtraEnv(), + LocalMetadata: localMetadata, + DeleteBeforeRestore: deleteBeforeRestore, + DbName: topoproto.TabletDbName(tablet), + Dir: dir, + } + // Loop until a backup exists, unless we were told to give up immediately. var pos mysql.Position var err error for { - pos, err = mysqlctl.Restore(ctx, agent.Cnf, agent.MysqlDaemon, dir, *restoreConcurrency, agent.hookExtraEnv(), localMetadata, logger, deleteBeforeRestore, topoproto.TabletDbName(tablet)) + pos, err = mysqlctl.Restore(ctx, params) if waitForBackupInterval == 0 { break } @@ -205,6 +219,39 @@ func (agent *ActionAgent) startReplication(ctx context.Context, pos mysql.Positi if err := agent.MysqlDaemon.SetMaster(ctx, topoproto.MysqlHostname(ti.Tablet), int(topoproto.MysqlPort(ti.Tablet)), false /* slaveStopBefore */, true /* slaveStartAfter */); err != nil { return vterrors.Wrap(err, "MysqlDaemon.SetMaster failed") } + + // wait for reliable seconds behind master + // we have pos where we want to resume from + // if MasterPosition is the same, that means no writes + // have happened to master, so we are up-to-date + // otherwise, wait for replica's Position to change from + // the initial pos before proceeding + tmc := tmclient.NewTabletManagerClient() + defer tmc.Close() + remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer remoteCancel() + posStr, err := tmc.MasterPosition(remoteCtx, ti.Tablet) + if err != nil { + return vterrors.Wrap(err, "can't get master replication position") + } + masterPos, err := mysql.DecodePosition(posStr) + if err != nil { + return vterrors.Wrapf(err, "can't decode master replication position: %q", posStr) + } + + if !pos.Equal(masterPos) { + for { + status, err := agent.MysqlDaemon.SlaveStatus() + if err != nil { + return vterrors.Wrap(err, "can't get slave status") + } + newPos := status.Position + if !newPos.Equal(pos) { + break + } + } + } + return nil } diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index d7387b7c941..5ebf261a4e6 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -99,7 +99,18 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo // now we can run the backup dir := fmt.Sprintf("%v/%v", tablet.Keyspace, tablet.Shard) name := fmt.Sprintf("%v.%v", time.Now().UTC().Format("2006-01-02.150405"), topoproto.TabletAliasString(tablet.Alias)) - returnErr := mysqlctl.Backup(ctx, agent.Cnf, agent.MysqlDaemon, l, dir, name, concurrency, agent.hookExtraEnv()) + backupParams := mysqlctl.BackupParams{ + Cnf: agent.Cnf, + Mysqld: agent.MysqlDaemon, + Logger: l, + Concurrency: concurrency, + HookExtraEnv: agent.hookExtraEnv(), + TopoServer: agent.TopoServer, + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + } + + returnErr := mysqlctl.Backup(ctx, dir, name, backupParams) if engine.ShouldDrainForBackup() { bgCtx := context.Background() diff --git a/go/vt/vttablet/tabletmanager/state_change.go b/go/vt/vttablet/tabletmanager/state_change.go index 4654f8f32c2..3d08d3a7f22 100644 --- a/go/vt/vttablet/tabletmanager/state_change.go +++ b/go/vt/vttablet/tabletmanager/state_change.go @@ -203,7 +203,10 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl // we're going to use it. var shardInfo *topo.ShardInfo var err error + // this is just for logging var disallowQueryReason string + // this is actually used to set state + var disallowQueryService string var blacklistedTables []string updateBlacklistedTables := true if allowQuery { @@ -212,10 +215,35 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl log.Errorf("Cannot read shard for this tablet %v, might have inaccurate SourceShards and TabletControls: %v", newTablet.Alias, err) updateBlacklistedTables = false } else { - if newTablet.Type == topodatapb.TabletType_MASTER { - if len(shardInfo.SourceShards) > 0 { - allowQuery = false - disallowQueryReason = "master tablet with filtered replication on" + if oldTablet.Type == topodatapb.TabletType_RESTORE { + // always start as NON-SERVING after a restore because + // healthcheck has not been initialized yet + allowQuery = false + // setting disallowQueryService permanently turns off query service + // since we want it to be temporary (until tablet is healthy) we don't set it + // disallowQueryReason is only used for logging + disallowQueryReason = "after restore from backup" + } else { + if newTablet.Type == topodatapb.TabletType_MASTER { + if len(shardInfo.SourceShards) > 0 { + allowQuery = false + disallowQueryReason = "master tablet with filtered replication on" + disallowQueryService = disallowQueryReason + } + } else { + replicationDelay, healthErr := agent.HealthReporter.Report(true, true) + if healthErr != nil { + allowQuery = false + disallowQueryReason = "unable to get health" + } else { + agent.mutex.Lock() + agent._replicationDelay = replicationDelay + agent.mutex.Unlock() + if agent._replicationDelay > *unhealthyThreshold { + allowQuery = false + disallowQueryReason = "replica tablet with unhealthy replication lag" + } + } } } srvKeyspace, err := agent.TopoServer.GetSrvKeyspace(ctx, newTablet.Alias.Cell, newTablet.Keyspace) @@ -233,6 +261,7 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl if tabletControl.QueryServiceDisabled { allowQuery = false disallowQueryReason = "TabletControl.DisableQueryService set" + disallowQueryService = disallowQueryReason } break } @@ -248,8 +277,9 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl } } else { disallowQueryReason = fmt.Sprintf("not a serving tablet type(%v)", newTablet.Type) + disallowQueryService = disallowQueryReason } - agent.setServicesDesiredState(disallowQueryReason, runUpdateStream) + agent.setServicesDesiredState(disallowQueryService, runUpdateStream) if updateBlacklistedTables { if err := agent.loadBlacklistRules(newTablet, blacklistedTables); err != nil { // FIXME(alainjobart) how to handle this error? diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index d4d68690ee2..9916268b083 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -94,10 +94,26 @@ func TestBackupRestore(t *testing.T) { t.Fatalf("failed to write file db.opt: %v", err) } - // create a master tablet, not started, just for shard health + // create a master tablet, set its master position master := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_MASTER, db) + master.FakeMysqlDaemon.ReadOnly = false + master.FakeMysqlDaemon.Replicating = false + master.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{ + GTIDSet: mysql.MariadbGTIDSet{ + mysql.MariadbGTID{ + Domain: 2, + Server: 123, + Sequence: 457, + }, + }, + } + + // start master so that slave can fetch master position from it + master.StartActionLoop(t, wr) + defer master.StopActionLoop(t) // create a single tablet, set it up so we can do backups + // set its position same as that of master so that backup doesn't wait for catchup sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db) sourceTablet.FakeMysqlDaemon.ReadOnly = true sourceTablet.FakeMysqlDaemon.Replicating = true