Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
dbName = fmt.Sprintf("vt_%s", *initKeyspace)
}

backupParams := mysqlctl.BackupParams{
Cnf: mycnf,
Mysqld: mysqld,
Logger: logutil.NewConsoleLogger(),
Concurrency: *concurrency,
HookExtraEnv: extraEnv,
TopoServer: topoServer,
Keyspace: *initKeyspace,
Shard: *initShard,
}
// In initial_backup mode, just take a backup of this empty database.
if *initialBackup {
// Take a backup of this empty DB without restoring anything.
Expand All @@ -257,15 +267,26 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}
// Now we're ready to take the backup.
name := backupName(time.Now(), tabletAlias)
if err := mysqlctl.Backup(ctx, mycnf, mysqld, logutil.NewConsoleLogger(), backupDir, name, *concurrency, extraEnv); err != nil {
if err := mysqlctl.Backup(ctx, backupDir, name, backupParams); err != nil {
return fmt.Errorf("backup failed: %v", err)
}
log.Info("Initial backup successful.")
return nil
}

log.Infof("Restoring latest backup from directory %v", backupDir)
restorePos, err := mysqlctl.Restore(ctx, mycnf, mysqld, backupDir, *concurrency, extraEnv, map[string]string{}, logutil.NewConsoleLogger(), true, dbName)
params := mysqlctl.RestoreParams{
Cnf: mycnf,
Mysqld: mysqld,
Logger: logutil.NewConsoleLogger(),
Concurrency: *concurrency,
HookExtraEnv: extraEnv,
LocalMetadata: map[string]string{},
DeleteBeforeRestore: true,
DbName: dbName,
Dir: backupDir,
}
restorePos, err := mysqlctl.Restore(ctx, params)
switch err {
case nil:
log.Infof("Successfully restored from backup at replication position %v", restorePos)
Expand Down Expand Up @@ -360,7 +381,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back

// Now we can take a new backup.
name := backupName(backupTime, tabletAlias)
if err := mysqlctl.Backup(ctx, mycnf, mysqld, logutil.NewConsoleLogger(), backupDir, name, *concurrency, extraEnv); err != nil {
if err := mysqlctl.Backup(ctx, backupDir, name, backupParams); err != nil {
return fmt.Errorf("error taking backup: %v", err)
}

Expand Down
30 changes: 15 additions & 15 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -90,7 +89,7 @@ var (
// - uses the BackupStorage service to store a new backup
// - shuts down Mysqld during the backup
// - remember if we were replicating, restore the exact same state
func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, dir, name string, backupConcurrency int, hookExtraEnv map[string]string) error {
func Backup(ctx context.Context, dir, name string, params BackupParams) error {
// Start the backup with the BackupStorage.
bs, err := backupstorage.GetBackupStorage()
if err != nil {
Expand All @@ -108,7 +107,8 @@ func Backup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.
}

// Take the backup, and either AbortBackup or EndBackup.
usable, err := be.ExecuteBackup(ctx, cnf, mysqld, logger, bh, backupConcurrency, hookExtraEnv)
usable, err := be.ExecuteBackup(ctx, params, bh)
logger := params.Logger
var finishErr error
if usable {
finishErr = bh.EndBackup(ctx)
Expand Down Expand Up @@ -215,17 +215,16 @@ func removeExistingFiles(cnf *Mycnf) error {
// Restore is the main entry point for backup restore. If there is no
// appropriate backup on the BackupStorage, Restore logs an error
// and returns ErrNoBackup. Any other error is returned.
func Restore(
ctx context.Context,
cnf *Mycnf,
mysqld MysqlDaemon,
dir string,
restoreConcurrency int,
hookExtraEnv map[string]string,
localMetadata map[string]string,
logger logutil.Logger,
deleteBeforeRestore bool,
dbName string) (mysql.Position, error) {
func Restore(ctx context.Context, params RestoreParams) (mysql.Position, error) {

// extract params
cnf := params.Cnf
mysqld := params.Mysqld
logger := params.Logger
localMetadata := params.LocalMetadata
deleteBeforeRestore := params.DeleteBeforeRestore
dbName := params.DbName
dir := params.Dir

rval := mysql.Position{}

Expand Down Expand Up @@ -295,7 +294,8 @@ func Restore(
if err != nil {
return mysql.Position{}, vterrors.Wrap(err, "Failed to find restore engine")
}
if rval, err = re.ExecuteRestore(ctx, cnf, mysqld, logger, dir, bh, restoreConcurrency, hookExtraEnv); err != nil {

if rval, err = re.ExecuteRestore(ctx, params, bh); err != nil {
return rval, err
}

Expand Down
43 changes: 41 additions & 2 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
)

Expand All @@ -39,13 +40,51 @@ var (

// BackupEngine is the interface to take a backup with a given engine.
type BackupEngine interface {
ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (bool, error)
ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error)
ShouldDrainForBackup() bool
}

// BackupParams is the struct that holds all params passed to ExecuteBackup
type BackupParams struct {
Cnf *Mycnf
Mysqld MysqlDaemon
Logger logutil.Logger
// Concurrency is the value of -concurrency flag given to Backup command
// It determines how many files are processed in parallel
Concurrency int
// Extra env variables for pre-backup and post-backup transform hooks
HookExtraEnv map[string]string
// TopoServer, Keyspace and Shard are used to discover master tablet
TopoServer *topo.Server
Keyspace string
Shard string
}

// RestoreParams is the struct that holds all params passed to ExecuteRestore
type RestoreParams struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For BackupParams and RestoreParams docs on the non-obvious args would be ✨. (To me the non-obvious one is mostly HookExtraEnv though I'm just assuming I understand DbName, LocalMetadata, and what the Keyspace/Shard pair is for).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Cnf *Mycnf
Mysqld MysqlDaemon
Logger logutil.Logger
// Concurrency is the value of -restore_concurrency flag (init restore parameter)
// It determines how many files are processed in parallel
Concurrency int
// Extra env variables for pre-restore and post-restore transform hooks
HookExtraEnv map[string]string
// Metadata to write into database after restore. See PopulateMetadataTables
LocalMetadata map[string]string
// DeleteBeforeRestore tells us whether existing data should be deleted before
// restoring. This is always set to false when starting a tablet with -restore_from_backup,
// but is set to true when executing a RestoreFromBackup command on an already running vttablet
DeleteBeforeRestore bool
// Name of the managed database / schema
DbName string
// Directory location to search for a usable backup
Dir string
}

// RestoreEngine is the interface to restore a backup with a given engine.
type RestoreEngine interface {
ExecuteRestore(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, dir string, bh backupstorage.BackupHandle, restoreConcurrency int, hookExtraEnv map[string]string) (mysql.Position, error)
ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error)
}

// BackupRestoreEngine is a combination of BackupEngine and RestoreEngine.
Expand Down
94 changes: 79 additions & 15 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

const (
Expand Down Expand Up @@ -236,7 +239,17 @@ func findFilesToBackup(cnf *Mycnf) ([]FileEntry, error) {

// ExecuteBackup returns a boolean that indicates if the backup is usable,
// and an overall error.
func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (bool, error) {
func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) {

// extract all params from BackupParams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it's necessary to extract all the params into separate vars, unless you need a few of the vars to overwrite, and then i'd only make those.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this for 2 reasons:

  • it serves as documentation of what params we actually use from the object in this function
  • it avoided making a whole lot of name changes through out the functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the docs belong in the params struct docs. if it isn't used, then there was not point of putting it in the struct.

if we never change the lines, then the code will get progressively more cluttered.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that params struct is used to pass the params down two levels of calls. Not all params are used at both levels so it is a union of the two sets of params. Do you still feel like we should not extract the params into vars?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$0.02: I find that the backup param makes ExecuteBackup & RestoreBackup much easier to read.

If the primary reason for extracting args is to minimize churn in this review I would suggest a follow up where we move to dereferencing params instead of extracting them like this. That would leave the interface cleaner / more easy to scan and remove this noisy block of param extraction.

cnf := params.Cnf
mysqld := params.Mysqld
logger := params.Logger
backupConcurrency := params.Concurrency
hookExtraEnv := params.HookExtraEnv
topoServer := params.TopoServer
keyspace := params.Keyspace
shard := params.Shard

logger.Infof("Hook: %v, Compress: %v", *backupStorageHook, *backupStorageCompress)

Expand Down Expand Up @@ -307,6 +320,12 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my
return usable, vterrors.Wrap(err, "can't restart mysqld")
}

// And set read-only mode
logger.Infof("resetting mysqld read-only to %v", readOnly)
if err := mysqld.SetReadOnly(readOnly); err != nil {
return usable, err
}

// Restore original mysqld state that we saved above.
if semiSyncMaster || semiSyncSlave {
// Only do this if one of them was on, since both being off could mean
Expand All @@ -328,12 +347,36 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, my
if err := WaitForSlaveStart(mysqld, slaveStartDeadline); err != nil {
return usable, vterrors.Wrap(err, "slave is not restarting")
}
}

// And set read-only mode
logger.Infof("resetting mysqld read-only to %v", readOnly)
if err := mysqld.SetReadOnly(readOnly); err != nil {
return usable, err
// Wait for a reliable value for SecondsBehindMaster from SlaveStatus()

// We know that we stopped at replicationPosition.
// If MasterPosition is the same, that means no writes
// have happened to master, so we are up-to-date.
// Otherwise, we wait for replica's Position to change from
// the saved replicationPosition before proceeding
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer remoteCancel()

masterPos, err := getMasterPosition(remoteCtx, tmc, topoServer, keyspace, shard)
// If we are unable to get master position, return error.
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new failure mode that we need to be aware, but I think it makes sense to fail if we can't get master position.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Also the same type of tablet/mysql loss is something that needs to be handled already by an operator as vttablet/mysqld could go down and fall behind the ability to catch up for several reasons already, e.g., network partition. This failure mode should be well covered by normal tooling.

The thing I'm more interested in actually is going to be

when trying to change tablet type and state after completing a backup, if the tablet is lagged, it stays in BACKUP type until it is caught up and only then transitions into REPLICA type. This seems to be an artifact of how state_change has been implemented in the code.

return usable, err
}
if !replicationPosition.Equal(masterPos) {
for {
status, err := mysqld.SlaveStatus()
if err != nil {
return usable, err
}
newPos := status.Position
if !newPos.Equal(replicationPosition) {
break
}
}
}
}

return usable, backupErr
Expand Down Expand Up @@ -513,15 +556,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, cnf *Mycnf, mysql
// ExecuteRestore restores from a backup. If the restore is successful
// we return the position from which replication should start
// otherwise an error is returned
func (be *BuiltinBackupEngine) ExecuteRestore(
ctx context.Context,
cnf *Mycnf,
mysqld MysqlDaemon,
logger logutil.Logger,
dir string,
bh backupstorage.BackupHandle,
restoreConcurrency int,
hookExtraEnv map[string]string) (mysql.Position, error) {
func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error) {

cnf := params.Cnf
mysqld := params.Mysqld
logger := params.Logger
restoreConcurrency := params.Concurrency
hookExtraEnv := params.HookExtraEnv

zeroPosition := mysql.Position{}
var bm builtinBackupManifest
Expand Down Expand Up @@ -683,6 +724,29 @@ func (be *BuiltinBackupEngine) ShouldDrainForBackup() bool {
return true
}

func getMasterPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server, keyspace, shard string) (mysql.Position, error) {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
return mysql.Position{}, vterrors.Wrap(err, "can't read shard")
}
if topoproto.TabletAliasIsZero(si.MasterAlias) {
return mysql.Position{}, fmt.Errorf("shard %v/%v has no master", keyspace, shard)
}
ti, err := ts.GetTablet(ctx, si.MasterAlias)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't get master tablet record %v: %v", topoproto.TabletAliasString(si.MasterAlias), err)
}
posStr, err := tmc.MasterPosition(ctx, ti.Tablet)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't get master replication position: %v", err)
}
pos, err := mysql.DecodePosition(posStr)
if err != nil {
return mysql.Position{}, fmt.Errorf("can't decode master replication position %q: %v", posStr, err)
}
return pos, nil
}

func init() {
BackupRestoreEngineMap["builtin"] = &BuiltinBackupEngine{}
}
21 changes: 11 additions & 10 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalE

// ExecuteBackup returns a boolean that indicates if the backup is usable,
// and an overall error.
func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, cnf *Mycnf, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, backupConcurrency int, hookExtraEnv map[string]string) (complete bool, finalErr error) {
func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (complete bool, finalErr error) {
// extract all params from BackupParams
cnf := params.Cnf
mysqld := params.Mysqld
logger := params.Logger

if *xtrabackupUser == "" {
return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.")
}
Expand Down Expand Up @@ -362,15 +367,11 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, cnf *Mycnf, logger
}

// ExecuteRestore restores from a backup. Any error is returned.
func (be *XtrabackupEngine) ExecuteRestore(
ctx context.Context,
cnf *Mycnf,
mysqld MysqlDaemon,
logger logutil.Logger,
dir string,
bh backupstorage.BackupHandle,
restoreConcurrency int,
hookExtraEnv map[string]string) (mysql.Position, error) {
func (be *XtrabackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (mysql.Position, error) {

cnf := params.Cnf
mysqld := params.Mysqld
logger := params.Logger

zeroPosition := mysql.Position{}
var bm xtraBackupManifest
Expand Down
Loading