Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

"context"
Expand All @@ -39,6 +40,7 @@ import (
// FakeMysqlDaemon implements MysqlDaemon and allows the user to fake
// everything.
type FakeMysqlDaemon struct {
mu sync.Mutex
// db is the fake SQL DB we may use for some queries.
db *fakesqldb.DB

Expand Down Expand Up @@ -248,11 +250,20 @@ func (fmd *FakeMysqlDaemon) GetMysqlPort() (int32, error) {
return fmd.MysqlPort.Get(), nil
}

// CurrentMasterPositionLocked is thread-safe
func (fmd *FakeMysqlDaemon) CurrentMasterPositionLocked(pos mysql.Position) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.CurrentMasterPosition = pos
}

// ReplicationStatus is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error) {
if fmd.ReplicationStatusError != nil {
return mysql.ReplicationStatus{}, fmd.ReplicationStatusError
}
fmd.mu.Lock()
defer fmd.mu.Unlock()
return mysql.ReplicationStatus{
Position: fmd.CurrentMasterPosition,
FilePosition: fmd.CurrentMasterFilePosition,
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger,
}
}()
err := tm.restoreDataLocked(ctx, logger, waitForBackupInterval, deleteBeforeRestore)
if err != nil {
return err
}
// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
Expand All @@ -99,7 +102,7 @@ func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger,
log.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
return err
return nil
}

func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool) error {
Expand Down Expand Up @@ -497,7 +500,7 @@ func (tm *TabletManager) startReplication(ctx context.Context, pos mysql.Positio

// If active reparents are disabled, we don't restart replication. So it makes no sense to wait for an update on the replica.
// Return immediately.
if !*mysqlctl.DisableActiveReparents {
if *mysqlctl.DisableActiveReparents {
return nil
}
// wait for reliable seconds behind master
Expand Down
21 changes: 11 additions & 10 deletions go/vt/vttablet/tabletmanager/rpc_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,18 @@ func (tm *TabletManager) Backup(ctx context.Context, concurrency int, logger log
l.Errorf("mysql backup command returned error: %v", returnErr)
}
returnErr = err
} else {
// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
if tm.orc == nil {
return
}
if err := tm.orc.EndMaintenance(tm.Tablet()); err != nil {
logger.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
}
// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
if tm.orc == nil {
return
}
if err := tm.orc.EndMaintenance(tm.Tablet()); err != nil {
logger.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
}

return returnErr
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,16 @@ func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error {
if err := tm.QueryServiceControl.SetServingType(tablet.Type, logutil.ProtoToTime(tablet.MasterTermStartTime), true, ""); err != nil {
return vterrors.Wrap(err, "SetServingType(serving=true) failed")
}

// Tell Orchestrator we're no longer stopped on purpose.
// Do this in the background, as it's best-effort.
go func() {
if tm.orc == nil {
return
}
if err := tm.orc.EndMaintenance(tm.Tablet()); err != nil {
log.Warningf("Orchestrator EndMaintenance failed: %v", err)
}
}()
return nil
}

Expand Down
200 changes: 200 additions & 0 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,206 @@ func TestBackupRestore(t *testing.T) {
assert.True(t, master.FakeMysqlDaemon.Running)
}

// TestBackupRestoreLagged tests the changes made in https://github.com/vitessio/vitess/pull/5000
// While doing a backup or a restore, we wait for a change of the replica's position before completing the action
// This is because otherwise SecondsBehindMaster is not accurate and the tablet may go into SERVING when it should not
func TestBackupRestoreLagged(t *testing.T) {
delay := discovery.GetTabletPickerRetryDelay()
defer func() {
discovery.SetTabletPickerRetryDelay(delay)
}()
discovery.SetTabletPickerRetryDelay(5 * time.Millisecond)

// Initialize our environment
ctx := context.Background()
db := fakesqldb.New(t)
defer db.Close()
ts := memorytopo.NewServer("cell1", "cell2")
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
vp := NewVtctlPipe(t, ts)
defer vp.Close()

// Set up mock query results.
db.AddQuery("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{})
db.AddQuery("BEGIN", &sqltypes.Result{})
db.AddQuery("COMMIT", &sqltypes.Result{})
db.AddQueryPattern(`SET @@session\.sql_log_bin = .*`, &sqltypes.Result{})
db.AddQueryPattern(`CREATE TABLE IF NOT EXISTS _vt\.shard_metadata .*`, &sqltypes.Result{})
db.AddQueryPattern(`CREATE TABLE IF NOT EXISTS _vt\.local_metadata .*`, &sqltypes.Result{})
db.AddQueryPattern(`ALTER TABLE _vt\.local_metadata .*`, &sqltypes.Result{})
db.AddQueryPattern(`ALTER TABLE _vt\.shard_metadata .*`, &sqltypes.Result{})
db.AddQueryPattern(`UPDATE _vt\.local_metadata SET db_name=.*`, &sqltypes.Result{})
db.AddQueryPattern(`UPDATE _vt\.shard_metadata SET db_name=.*`, &sqltypes.Result{})
db.AddQueryPattern(`INSERT INTO _vt\.local_metadata .*`, &sqltypes.Result{})

// Initialize our temp dirs
root, err := ioutil.TempDir("", "backuptest")
require.NoError(t, err)
defer os.RemoveAll(root)

// Initialize BackupStorage
fbsRoot := path.Join(root, "fbs")
*filebackupstorage.FileBackupStorageRoot = fbsRoot
*backupstorage.BackupStorageImplementation = "file"

// Initialize the fake mysql root directories
sourceInnodbDataDir := path.Join(root, "source_innodb_data")
sourceInnodbLogDir := path.Join(root, "source_innodb_log")
sourceDataDir := path.Join(root, "source_data")
sourceDataDbDir := path.Join(sourceDataDir, "vt_db")
for _, s := range []string{sourceInnodbDataDir, sourceInnodbLogDir, sourceDataDbDir} {
require.NoError(t, os.MkdirAll(s, os.ModePerm))
}
require.NoError(t, ioutil.WriteFile(path.Join(sourceInnodbDataDir, "innodb_data_1"), []byte("innodb data 1 contents"), os.ModePerm))
require.NoError(t, ioutil.WriteFile(path.Join(sourceInnodbLogDir, "innodb_log_1"), []byte("innodb log 1 contents"), os.ModePerm))
require.NoError(t, ioutil.WriteFile(path.Join(sourceDataDbDir, "db.opt"), []byte("db opt file"), os.ModePerm))

// create a master tablet, set its master position
master := NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_MASTER, db)
master.FakeMysqlDaemon.ReadOnly = false
master.FakeMysqlDaemon.Replicating = false
master.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{
GTIDSet: mysql.MariadbGTIDSet{
2: mysql.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
}

// start master so that replica can fetch master position from it
master.StartActionLoop(t, wr)
defer master.StopActionLoop(t)

// create a single tablet, set it up so we can do backups
// set its position same as that of master so that backup doesn't wait for catchup
sourceTablet := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_REPLICA, db)
sourceTablet.FakeMysqlDaemon.ReadOnly = true
sourceTablet.FakeMysqlDaemon.Replicating = true
sourceTablet.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{
GTIDSet: mysql.MariadbGTIDSet{
2: mysql.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 456,
},
},
}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE",
"START SLAVE",
}
sourceTablet.StartActionLoop(t, wr)
defer sourceTablet.StopActionLoop(t)

sourceTablet.TM.Cnf = &mysqlctl.Mycnf{
DataDir: sourceDataDir,
InnodbDataHomeDir: sourceInnodbDataDir,
InnodbLogGroupHomeDir: sourceInnodbLogDir,
}

errCh := make(chan error, 1)
go func(ctx context.Context, tablet *FakeTablet) {
errCh <- vp.Run([]string{"Backup", topoproto.TabletAliasString(tablet.Tablet.Alias)})
}(ctx, sourceTablet)

timer := time.NewTicker(1 * time.Second)
<-timer.C
sourceTablet.FakeMysqlDaemon.CurrentMasterPositionLocked(mysql.Position{
GTIDSet: mysql.MariadbGTIDSet{
2: mysql.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
})

timer2 := time.NewTicker(5 * time.Second)
select {
case err := <-errCh:
require.Nil(t, err)
// verify the full status
// verify the full status
require.NoError(t, sourceTablet.FakeMysqlDaemon.CheckSuperQueryList())
assert.True(t, sourceTablet.FakeMysqlDaemon.Replicating)
assert.True(t, sourceTablet.FakeMysqlDaemon.Running)
assert.Equal(t, master.FakeMysqlDaemon.CurrentMasterPosition, sourceTablet.FakeMysqlDaemon.CurrentMasterPosition)
case <-timer2.C:
require.FailNow(t, "Backup timed out")
}

// create a destination tablet, set it up so we can do restores
destTablet := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, db)
destTablet.FakeMysqlDaemon.ReadOnly = true
destTablet.FakeMysqlDaemon.Replicating = true
destTablet.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{
GTIDSet: mysql.MariadbGTIDSet{
2: mysql.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 456,
},
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"FAKE SET MASTER",
"START SLAVE",
}
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
}
destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentMasterPosition
destTablet.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(master.Tablet)

destTablet.StartActionLoop(t, wr)
defer destTablet.StopActionLoop(t)

destTablet.TM.Cnf = &mysqlctl.Mycnf{
DataDir: sourceDataDir,
InnodbDataHomeDir: sourceInnodbDataDir,
InnodbLogGroupHomeDir: sourceInnodbLogDir,
BinLogPath: path.Join(root, "bin-logs/filename_prefix"),
RelayLogPath: path.Join(root, "relay-logs/filename_prefix"),
RelayLogIndexPath: path.Join(root, "relay-log.index"),
RelayLogInfoPath: path.Join(root, "relay-log.info"),
}

errCh = make(chan error, 1)
go func(ctx context.Context, tablet *FakeTablet) {
errCh <- tablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */)
}(ctx, destTablet)

timer = time.NewTicker(1 * time.Second)
<-timer.C
destTablet.FakeMysqlDaemon.CurrentMasterPositionLocked(mysql.Position{
GTIDSet: mysql.MariadbGTIDSet{
2: mysql.MariadbGTID{
Domain: 2,
Server: 123,
Sequence: 457,
},
},
})

timer2 = time.NewTicker(5 * time.Second)
select {
case err := <-errCh:
require.Nil(t, err)
// verify the full status
require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed")
assert.True(t, destTablet.FakeMysqlDaemon.Replicating)
assert.True(t, destTablet.FakeMysqlDaemon.Running)
assert.Equal(t, master.FakeMysqlDaemon.CurrentMasterPosition, destTablet.FakeMysqlDaemon.CurrentMasterPosition)
case <-timer2.C:
require.FailNow(t, "Restore timed out")
}
}

func TestRestoreUnreachableMaster(t *testing.T) {
delay := discovery.GetTabletPickerRetryDelay()
defer func() {
Expand Down