Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topo
if err == nil && (rs.SlaveIORunning || rs.SlaveSQLRunning) {
wasReplicating = true
shouldbeReplicating = true
} else if err == mysql.ErrNotSlave {
// If we used to be a master, or if we started as a replica but never
// found out who the master is, we always try to start replicating once
// we are told who the new master is via SetMaster.
shouldbeReplicating = true
}
if forceStartSlave {
shouldbeReplicating = true
Expand Down
160 changes: 92 additions & 68 deletions go/vt/wrangler/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
return err
}

// Check corner cases we're going to depend on
// Check invariants we're going to depend on.
if topoproto.TabletAliasEqual(masterElectTabletAlias, avoidMasterTabletAlias) {
return fmt.Errorf("master-elect tablet %v is the same as the tablet to avoid", topoproto.TabletAliasString(masterElectTabletAlias))
}
Expand All @@ -402,50 +402,65 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
return fmt.Errorf("master-elect tablet %v is not in the shard", masterElectTabletAliasStr)
}
ev.NewMaster = *masterElectTabletInfo.Tablet
if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) {
return fmt.Errorf("master-elect tablet %v is already the master", masterElectTabletAliasStr)
}
if topoproto.TabletAliasIsZero(shardInfo.MasterAlias) {
return fmt.Errorf("the shard has no master, use EmergencyReparentShard")
}
oldMasterTabletInfo, ok := tabletMap[topoproto.TabletAliasString(shardInfo.MasterAlias)]
if !ok {
return fmt.Errorf("old master tablet %v is not in the shard", topoproto.TabletAliasString(shardInfo.MasterAlias))
}
ev.OldMaster = *oldMasterTabletInfo.Tablet

// create a new context for the short running remote operations
remoteCtx, remoteCancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer remoteCancel()

// Demote the current master, get its replication position
wr.logger.Infof("demote current master %v", shardInfo.MasterAlias)
event.DispatchUpdate(ev, "demoting old master")
rp, err := wr.tmc.DemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet)
if err != nil {
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err)
}
var reparentJournalPos string

remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout)
defer remoteCancel()
if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) {
// If the master is already the one we want, we just try to fix replicas (below).
rp, err := wr.tmc.MasterPosition(remoteCtx, masterElectTabletInfo.Tablet)
if err != nil {
return fmt.Errorf("can't get current replication position of master: %v", err)
}
reparentJournalPos = rp
} else {
// If the current master is not the one we want, demote the old master.
// Note that since the shard record is eventually consistent, we might be
// demoting a former master that was already demoted.

// Wait on the master-elect tablet until it reaches that position,
// then promote it
wr.logger.Infof("promote slave %v", masterElectTabletAliasStr)
event.DispatchUpdate(ev, "promoting slave")
rp, err = wr.tmc.PromoteSlaveWhenCaughtUp(remoteCtx, masterElectTabletInfo.Tablet, rp)
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) {
remoteCancel()
// if this fails it is not enough to return an error. we should rollback all the changes made by DemoteMaster
remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer remoteCancel()
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil {
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1)
oldMasterTabletInfo, ok := tabletMap[topoproto.TabletAliasString(shardInfo.MasterAlias)]
if !ok {
return fmt.Errorf("old master tablet %v is not in the shard", topoproto.TabletAliasString(shardInfo.MasterAlias))
}
ev.OldMaster = *oldMasterTabletInfo.Tablet

// Demote the old master and get its replication position.
wr.logger.Infof("demote current master %v", shardInfo.MasterAlias)
event.DispatchUpdate(ev, "demoting old master")
rp, err := wr.tmc.DemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet)
if err != nil {
return fmt.Errorf("old master tablet %v DemoteMaster failed: %v", topoproto.TabletAliasString(shardInfo.MasterAlias), err)
}
return fmt.Errorf("master-elect tablet %v failed to catch up with replication or be upgraded to master: %v", masterElectTabletAliasStr, err)

// Wait on the master-elect tablet until it reaches that position,
// then promote it.
wr.logger.Infof("promote replica %v", masterElectTabletAliasStr)
event.DispatchUpdate(ev, "promoting replica")
rp, err = wr.tmc.PromoteSlaveWhenCaughtUp(remoteCtx, masterElectTabletInfo.Tablet, rp)
if err != nil || (ctx.Err() != nil && ctx.Err() == context.DeadlineExceeded) {
remoteCancel()
// If we fail to promote the new master, try to roll back to the
// original master before aborting.
remoteCtx, remoteCancel = context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer remoteCancel()
if err1 := wr.tmc.UndoDemoteMaster(remoteCtx, oldMasterTabletInfo.Tablet); err1 != nil {
log.Warningf("Encountered error %v while trying to undo DemoteMaster", err1)
}
return fmt.Errorf("master-elect tablet %v failed to catch up with replication or be upgraded to master: %v", masterElectTabletAliasStr, err)
}
reparentJournalPos = rp
}

// Check we stil have the topology lock.
remoteCtx, remoteCancel = context.WithTimeout(ctx, waitSlaveTimeout)
defer remoteCancel()

// Check we still have the topology lock.
if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil {
return fmt.Errorf("lost topology lock, aborting: %v", err)
}
Expand All @@ -459,59 +474,68 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R
// - new master: populate the reparent journal
// - everybody else: reparent to new master, wait for row
event.DispatchUpdate(ev, "reparenting all tablets")
now := time.Now().UnixNano()
wgMaster := sync.WaitGroup{}
wgSlaves := sync.WaitGroup{}

// We add a (hopefully) unique record to the reparent journal table on the
// new master so we can check if replicas got it through replication.
reparentJournalTimestamp := time.Now().UnixNano()

// Point all replicas at the new master and check that they receive the
// reparent journal entry, proving they are replicating from the new master.
// We do this concurrently with adding the journal entry (below), because
// if semi-sync is enabled, the update to the journal table can't succeed
// until at least one replica is successfully attached to the new master.
wgReplicas := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
var masterErr error
oldMasterTabletInfoAliasStr := topoproto.TabletAliasString(oldMasterTabletInfo.Alias)
for alias, tabletInfo := range tabletMap {
if alias == masterElectTabletAliasStr {
wgMaster.Add(1)
go func(alias string, tabletInfo *topo.TabletInfo) {
defer wgMaster.Done()
wr.logger.Infof("populating reparent journal on new master %v", alias)
masterErr = wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, plannedReparentShardOperation, masterElectTabletAlias, rp)
}(alias, tabletInfo)
} else {
wgSlaves.Add(1)
go func(alias string, tabletInfo *topo.TabletInfo) {
defer wgSlaves.Done()
wr.logger.Infof("setting new master on slave %v", alias)
// also restart replication on old master
forceStartSlave := alias == oldMasterTabletInfoAliasStr
if err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, masterElectTabletAlias, now, forceStartSlave); err != nil {
rec.RecordError(fmt.Errorf("tablet %v SetMaster failed: %v", alias, err))
return
}
}(alias, tabletInfo)
continue
}
wgReplicas.Add(1)
go func(alias string, tabletInfo *topo.TabletInfo) {
defer wgReplicas.Done()
wr.logger.Infof("setting new master on slave %v", alias)

// We used to force slave start on the old master, but now that
// we support "resuming" a PRS attempt that failed, we can no
// longer assume that we know who the old master was.
// Instead, we rely on the old master to remember that it needs
// to start replication after being converted to a replica.
forceStartReplication := false

if err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, masterElectTabletAlias, reparentJournalTimestamp, forceStartReplication); err != nil {
rec.RecordError(fmt.Errorf("tablet %v SetMaster failed: %v", alias, err))
return
}
}(alias, tabletInfo)
}

// After the master is done, we can update the shard record
// (note with semi-sync, it also means at least one slave is done)
wgMaster.Wait()
if masterErr != nil {
// The master failed, there is no way the
// slaves will work. So we cancel them all.
wr.logger.Warningf("master failed to PopulateReparentJournal, canceling slaves")
// Add a reparent journal entry on the new master.
wr.logger.Infof("populating reparent journal on new master %v", masterElectTabletAliasStr)
err = wr.tmc.PopulateReparentJournal(replCtx, masterElectTabletInfo.Tablet, reparentJournalTimestamp, plannedReparentShardOperation, masterElectTabletAlias, reparentJournalPos)
if err != nil {
// The master failed. There's no way the replicas will work, so cancel them all.
wr.logger.Warningf("master failed to PopulateReparentJournal, canceling replica reparent attempts")
replCancel()
wgSlaves.Wait()
return fmt.Errorf("failed to PopulateReparentJournal on master: %v", masterErr)
wgReplicas.Wait()
return fmt.Errorf("failed to PopulateReparentJournal on master: %v", err)
}

// After the master is done, we can update the shard record.
// TODO(deepthi): Remove this when we make the master tablet responsible for
// updating the shard record.
wr.logger.Infof("updating shard record with new master %v", masterElectTabletAlias)
if _, err := wr.ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error {
si.MasterAlias = masterElectTabletAlias
return nil
}); err != nil {
wgSlaves.Wait()
wgReplicas.Wait()
return fmt.Errorf("failed to update shard master record: %v", err)
}

// Wait for the slaves to complete.
wgSlaves.Wait()
// Wait for the replicas to complete.
wgReplicas.Wait()
if err := rec.Error(); err != nil {
wr.Logger().Errorf2(err, "some slaves failed to reparent")
wr.Logger().Errorf2(err, "some replicas failed to reparent; retry PlannedReparentShard with the same new master alias to retry failed replicas")
return err
}

Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/testlib/planned_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestPlannedReparentShardNoMasterProvided(t *testing.T) {
// old master
oldMaster.FakeMysqlDaemon.ReadOnly = false
oldMaster.FakeMysqlDaemon.Replicating = false
oldMaster.FakeMysqlDaemon.SlaveStatusError = mysql.ErrNotSlave
oldMaster.FakeMysqlDaemon.CurrentMasterPosition = newMaster.FakeMysqlDaemon.WaitMasterPosition
oldMaster.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet)
oldMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
Expand Down Expand Up @@ -181,6 +182,7 @@ func TestPlannedReparentShard(t *testing.T) {
// old master
oldMaster.FakeMysqlDaemon.ReadOnly = false
oldMaster.FakeMysqlDaemon.Replicating = false
oldMaster.FakeMysqlDaemon.SlaveStatusError = mysql.ErrNotSlave
oldMaster.FakeMysqlDaemon.CurrentMasterPosition = newMaster.FakeMysqlDaemon.WaitMasterPosition
oldMaster.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet)
oldMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
Expand Down
17 changes: 12 additions & 5 deletions test/reparent.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ def _test_reparent_graceful(self, shard_id):
'-new_master', tablet_62044.tablet_alias], auto_log=True)
utils.validate_topology()

# Perform a repeat of the same graceful reparent operation.
# It should be idempotent.
utils.run_vtctl(['PlannedReparentShard',
'-keyspace_shard', 'test_keyspace/' + shard_id,
'-new_master', tablet_62044.tablet_alias], auto_log=True)
utils.validate_topology()

self._check_master_tablet(tablet_62044)

# insert data into the new master, check the connected slaves work
Expand Down Expand Up @@ -621,10 +628,10 @@ def test_reparent_with_down_slave(self, shard_id='0'):

utils.pause('check orphan')

# reparent the tablet (will not start replication, so we have to
# do it ourselves), then it should catch up on replication really quickly
utils.run_vtctl(['ReparentTablet', tablet_41983.tablet_alias])
utils.run_vtctl(['StartSlave', tablet_41983.tablet_alias])
# Use the same PlannedReparentShard command to fix up the tablet.
utils.run_vtctl(['PlannedReparentShard',
'-keyspace_shard', 'test_keyspace/' + shard_id,
'-new_master', tablet_62044.tablet_alias])

# wait until it gets the data
self._check_vt_insert_test(tablet_41983, 3)
Expand Down Expand Up @@ -754,7 +761,7 @@ def test_reparent_doesnt_hang_if_master_fails(self):
'-keyspace_shard', 'test_keyspace/0',
'-new_master', tablet_62044.tablet_alias],
expect_fail=True)
self.assertIn('master failed to PopulateReparentJournal, canceling slaves',
self.assertIn('master failed to PopulateReparentJournal',
stderr)

# Clean up the tablets.
Expand Down