diff --git a/go.mod b/go.mod index 1ff58dc17cc..327920585cd 100644 --- a/go.mod +++ b/go.mod @@ -96,5 +96,6 @@ require ( k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery v0.17.3 k8s.io/client-go v0.17.3 + k8s.io/utils v0.0.0-20191114184206-e782cd3c129f sigs.k8s.io/yaml v1.1.0 ) diff --git a/go/mysql/filepos_gtid.go b/go/mysql/filepos_gtid.go index 8a276f0a018..eb55bc33e3a 100644 --- a/go/mysql/filepos_gtid.go +++ b/go/mysql/filepos_gtid.go @@ -44,8 +44,8 @@ func parseFilePosGTID(s string) (GTID, error) { }, nil } -// parseFilePosGTIDSet is registered as a GTIDSet parser. -func parseFilePosGTIDSet(s string) (GTIDSet, error) { +// ParseFilePosGTIDSet is registered as a GTIDSet parser. +func ParseFilePosGTIDSet(s string) (GTIDSet, error) { gtid, err := parseFilePosGTID(s) if err != nil { return nil, err @@ -156,6 +156,6 @@ func (gtid filePosGTID) Last() string { func init() { gtidParsers[FilePosFlavorID] = parseFilePosGTID - gtidSetParsers[FilePosFlavorID] = parseFilePosGTIDSet + gtidSetParsers[FilePosFlavorID] = ParseFilePosGTIDSet flavors[FilePosFlavorID] = newFilePosFlavor } diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 33d3bd8a9bb..bc5933e0636 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -25,14 +25,13 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/log" - - "vitess.io/vitess/go/mysql" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -110,18 +109,34 @@ func TestReparentDownMaster(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand( "-action_timeout", "1s", "PlannedReparentShard", + "-wait_replicas_timeout", "5s", "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias) require.Error(t, err) // Run forced reparent operation, this should now proceed unimpeded. - err = clusterInstance.VtctlclientProcess.ExecuteCommand( + out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( "EmergencyReparentShard", "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias, - "-wait_replicas_timeout", "31s") + "-wait_replicas_timeout", "30s") + log.Infof("EmergencyReparentShard Output: %v", out) + require.Nil(t, err) require.NoError(t, err) + // Check that old master tablet is left around for human intervention. + out, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") + require.Error(t, err) + require.Contains(t, out, "already has master") + + // Now we'll manually remove it, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.NoError(t, err) + + // Now validate topo is correct. validateTopology(t, false) checkMasterTablet(t, tablet62044) @@ -138,6 +153,113 @@ func TestReparentDownMaster(t *testing.T) { tablet62344.MysqlctlProcess.InitMysql = false err = tablet62344.MysqlctlProcess.Start() require.NoError(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.NoError(t, err) + + // As there is already a master the new replica will come directly in SERVING state + tablet62344.VttabletProcess.ServingStatus = "SERVING" + // Start the tablet + err = tablet62344.VttabletProcess.Setup() + require.NoError(t, err) + + err = checkInsertedValues(ctx, t, tablet62344, 2) + require.NoError(t, err) + + // Kill tablets + killTablets(t) +} + +func TestReparentNoChoiceDownMaster(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + // Create Database + err := tablet.VttabletProcess.CreateDB(keyspaceName) + require.NoError(t, err) + + // Reset status, don't wait for the tablet status. We will check it later + tablet.VttabletProcess.ServingStatus = "" + + // Start the tablet + err = tablet.VttabletProcess.Setup() + require.NoError(t, err) + } + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) + require.NoError(t, err) + } + + // Init Shard Master + err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", + "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) + require.NoError(t, err) + + validateTopology(t, true) + + // create Tables + runSQL(ctx, t, sqlSchema, tablet62344) + + // insert data into the old master, check the connected replica work + insertSQL1 := fmt.Sprintf(insertSQL, 2, 2) + runSQL(ctx, t, insertSQL1, tablet62344) + err = checkInsertedValues(ctx, t, tablet62044, 2) + require.NoError(t, err) + err = checkInsertedValues(ctx, t, tablet41983, 2) + require.NoError(t, err) + err = checkInsertedValues(ctx, t, tablet31981, 2) + require.NoError(t, err) + + // Make the current master agent and database unavailable. + err = tablet62344.VttabletProcess.TearDown() + require.NoError(t, err) + err = tablet62344.MysqlctlProcess.Stop() + require.NoError(t, err) + + // Run forced reparent operation, this should now proceed unimpeded. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-wait_replicas_timeout", "30s") + require.NoError(t, err) + + // Check that old master tablet is left around for human intervention. + out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") + require.Error(t, err) + require.Contains(t, out, "already has master") + + // Now we'll manually remove the old master, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.NoError(t, err) + + // Now validate topo is correct. + validateTopology(t, false) + + var newMasterTablet *cluster.Vttablet + for _, tablet := range []*cluster.Vttablet{tablet62044, tablet41983, tablet31981} { + if isHealthyMasterTablet(t, tablet) { + newMasterTablet = tablet + break + } + } + require.NotNil(t, newMasterTablet) + // Validate new master is not old master. + require.NotEqual(t, newMasterTablet.Alias, tablet62344.Alias) + + // Check new master has latest transaction. + err = checkInsertedValues(ctx, t, newMasterTablet, 2) + require.NoError(t, err) + + // bring back the old master as a replica, check that it catches up + tablet62344.MysqlctlProcess.InitMysql = false + err = tablet62344.MysqlctlProcess.Start() + require.NoError(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.NoError(t, err) // As there is already a master the new replica will come directly in SERVING state tablet62344.VttabletProcess.ServingStatus = "SERVING" @@ -152,6 +274,132 @@ func TestReparentDownMaster(t *testing.T) { killTablets(t) } +func TestReparentIgnoreReplicas(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + // Create Database + err := tablet.VttabletProcess.CreateDB(keyspaceName) + require.Nil(t, err) + + // Reset status, don't wait for the tablet status. We will check it later + tablet.VttabletProcess.ServingStatus = "" + // Init Tablet + err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // Start the tablet + err = tablet.VttabletProcess.Setup() + require.Nil(t, err) + } + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) + require.Nil(t, err) + } + + // Init Shard Master. + err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", + "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) + require.Nil(t, err) + + validateTopology(t, true) + + // Create Tables. + runSQL(ctx, t, sqlSchema, tablet62344) + + // insert data into the old master, check the connected replica work + insertSQL1 := fmt.Sprintf(insertSQL, 2, 2) + runSQL(ctx, t, insertSQL1, tablet62344) + err = checkInsertedValues(ctx, t, tablet62044, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet41983, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet31981, 2) + require.Nil(t, err) + + // Make the current master agent and database unavailable. + err = tablet62344.VttabletProcess.TearDown() + require.Nil(t, err) + err = tablet62344.MysqlctlProcess.Stop() + require.Nil(t, err) + + // Take down a replica - this should cause the emergency reparent to fail. + err = tablet41983.VttabletProcess.TearDown() + require.Nil(t, err) + err = tablet41983.MysqlctlProcess.Stop() + require.Nil(t, err) + + // We expect this one to fail because we have an unreachable replica + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-wait_replicas_timeout", "30s") + require.NotNil(t, err) + + // Now let's run it again, but set the command to ignore the unreachable replica. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-ignore_replicas", tablet41983.Alias, + "-wait_replicas_timeout", "30s") + require.Nil(t, err) + + // We'll bring back the replica we took down. + tablet41983.MysqlctlProcess.InitMysql = false + err = tablet41983.MysqlctlProcess.Start() + require.Nil(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet41983, tablet41983.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // Check that old master tablet is left around for human intervention. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + require.Error(t, err) + + // Now we'll manually remove the old master, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.Nil(t, err) + + // Now validate topo is correct. + validateTopology(t, false) + + var newMasterTablet *cluster.Vttablet + for _, tablet := range []*cluster.Vttablet{tablet62044, tablet41983, tablet31981} { + if isHealthyMasterTablet(t, tablet) { + newMasterTablet = tablet + break + } + } + require.NotNil(t, newMasterTablet) + + // Check new master has latest transaction. + err = checkInsertedValues(ctx, t, newMasterTablet, 2) + require.Nil(t, err) + + // bring back the old master as a replica, check that it catches up + tablet62344.MysqlctlProcess.InitMysql = false + err = tablet62344.MysqlctlProcess.Start() + require.Nil(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // As there is already a master the new replica will come directly in SERVING state + tablet62344.VttabletProcess.ServingStatus = "SERVING" + // Start the tablet + err = tablet62344.VttabletProcess.Setup() + require.Nil(t, err) + + err = checkInsertedValues(ctx, t, tablet62344, 2) + require.Nil(t, err) + + // Kill tablets + killTablets(t) +} + func TestReparentCrossCell(t *testing.T) { defer cluster.PanicHandler(t) @@ -820,7 +1068,30 @@ func checkMasterTablet(t *testing.T, tablet *cluster.Vttablet) { assert.True(t, streamHealthResponse.GetServing()) tabletType := streamHealthResponse.GetTarget().GetTabletType() assert.Equal(t, topodatapb.TabletType_MASTER, tabletType) +} + +// isHealthyMasterTablet will return if tablet is master AND healthy. +func isHealthyMasterTablet(t *testing.T, tablet *cluster.Vttablet) bool { + result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", tablet.Alias) + require.Nil(t, err) + var tabletInfo topodatapb.Tablet + err = json2.Unmarshal([]byte(result), &tabletInfo) + require.Nil(t, err) + if tabletInfo.GetType() != topodatapb.TabletType_MASTER { + return false + } + // make sure the health stream is updated + result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", tablet.Alias) + require.Nil(t, err) + var streamHealthResponse querypb.StreamHealthResponse + + err = json2.Unmarshal([]byte(result), &streamHealthResponse) + require.Nil(t, err) + + assert.True(t, streamHealthResponse.GetServing()) + tabletType := streamHealthResponse.GetTarget().GetTabletType() + return tabletType == topodatapb.TabletType_MASTER } func checkInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, index int) error { diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go index 7707ac63507..f4142ffee20 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go @@ -65,6 +65,9 @@ type FakeMysqlDaemon struct { // and ReplicationStatus CurrentMasterPosition mysql.Position + // CurrentMasterFilePosition is used to determine the executed file based positioning of the master. + CurrentMasterFilePosition mysql.Position + // ReplicationStatusError is used by ReplicationStatus ReplicationStatusError error @@ -225,8 +228,10 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error) return mysql.ReplicationStatus{}, fmd.ReplicationStatusError } return mysql.ReplicationStatus{ - Position: fmd.CurrentMasterPosition, - SecondsBehindMaster: fmd.SecondsBehindMaster, + Position: fmd.CurrentMasterPosition, + FilePosition: fmd.CurrentMasterFilePosition, + FileRelayLogPosition: fmd.CurrentMasterFilePosition, + SecondsBehindMaster: fmd.SecondsBehindMaster, // implemented as AND to avoid changing all tests that were // previously using Replicating = false IOThreadRunning: fmd.Replicating && fmd.IOThreadRunning, @@ -242,7 +247,8 @@ func (fmd *FakeMysqlDaemon) MasterStatus(ctx context.Context) (mysql.MasterStatu return mysql.MasterStatus{}, fmd.MasterStatusError } return mysql.MasterStatus{ - Position: fmd.CurrentMasterPosition, + Position: fmd.CurrentMasterPosition, + FilePosition: fmd.CurrentMasterFilePosition, }, nil } diff --git a/go/vt/topo/topoproto/tablet.go b/go/vt/topo/topoproto/tablet.go index 7c5dd30a437..bca66befc3d 100644 --- a/go/vt/topo/topoproto/tablet.go +++ b/go/vt/topo/topoproto/tablet.go @@ -26,6 +26,8 @@ import ( "strings" "github.com/golang/protobuf/proto" + "k8s.io/apimachinery/pkg/util/sets" + "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/vterrors" @@ -89,6 +91,17 @@ func ParseTabletAlias(aliasStr string) (*topodatapb.TabletAlias, error) { }, nil } +// ParseTabletSet returns a set of tablets based on a provided comma separated list of tablets. +func ParseTabletSet(tabletListStr string) sets.String { + set := sets.NewString() + if tabletListStr == "" { + return set + } + list := strings.Split(tabletListStr, ",") + set.Insert(list...) + return set +} + // ParseUID parses just the uid (a number) func ParseUID(value string) (uint32, error) { uid, err := strconv.ParseUint(value, 10, 32) diff --git a/go/vt/vtctl/reparent.go b/go/vt/vtctl/reparent.go index dc56e9b898a..b4cca092825 100644 --- a/go/vt/vtctl/reparent.go +++ b/go/vt/vtctl/reparent.go @@ -49,8 +49,8 @@ func init() { addCommand("Shards", command{ "EmergencyReparentShard", commandEmergencyReparentShard, - "-keyspace_shard= -new_master= [-wait_replicas_timeout=]", - "Reparents the shard to the new master. Assumes the old master is dead and not responsding."}) + "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=] [-ignore_replicas=]", + "Reparents the shard to the new master. Assumes the old master is dead and not responding."}) addCommand("Shards", command{ "TabletExternallyReparented", commandTabletExternallyReparented, @@ -166,7 +166,8 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s *waitReplicasTimeout = *deprecatedTimeout } keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented") - newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master") + newMaster := subFlags.String("new_master", "", "optional alias of a tablet that should be the new master. If not specified, Vitess will select the best candidate") + ignoreReplicasList := subFlags.String("ignore_replicas", "", "comma-separated list of replica tablet aliases to ignore during emergency reparent") if err := subFlags.Parse(args); err != nil { return err } @@ -178,18 +179,22 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s *keyspaceShard = subFlags.Arg(0) *newMaster = subFlags.Arg(1) } else if subFlags.NArg() != 0 { - return fmt.Errorf("action EmergencyReparentShard requires -keyspace_shard= -new_master=") + return fmt.Errorf("action EmergencyReparentShard requires -keyspace_shard=") } keyspace, shard, err := topoproto.ParseKeyspaceShard(*keyspaceShard) if err != nil { return err } - tabletAlias, err := topoproto.ParseTabletAlias(*newMaster) - if err != nil { - return err + var tabletAlias *topodatapb.TabletAlias + if *newMaster != "" { + tabletAlias, err = topoproto.ParseTabletAlias(*newMaster) + if err != nil { + return err + } } - return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout) + unreachableReplicas := topoproto.ParseTabletSet(*ignoreReplicasList) + return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout, unreachableReplicas) } func commandTabletExternallyReparented(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 98ff80bed62..e9dc22a0eb5 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -26,6 +26,9 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/pointer" + "vitess.io/vitess/go/event" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqlescape" @@ -842,9 +845,13 @@ func (wr *Wrangler) chooseNewMaster( // EmergencyReparentShard will make the provided tablet the master for // the shard, when the old master is completely unreachable. -func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) (err error) { +func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (err error) { // lock the shard - ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, fmt.Sprintf("EmergencyReparentShard(%v)", topoproto.TabletAliasString(masterElectTabletAlias))) + actionMsg := emergencyReparentShardOperation + if masterElectTabletAlias != nil { + actionMsg += fmt.Sprintf("(%v)", topoproto.TabletAliasString(masterElectTabletAlias)) + } + ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, actionMsg) if lockErr != nil { return lockErr } @@ -854,7 +861,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard ev := &events.Reparent{} // do the work - err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout) + err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, ignoredTablets) if err != nil { event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) } else { @@ -863,7 +870,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard return err } -func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) error { +func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) error { shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { return err @@ -873,120 +880,100 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events event.DispatchUpdate(ev, "reading all tablets") tabletMap, err := wr.ts.GetTabletMapForShard(ctx, keyspace, shard) if err != nil { - return err + return vterrors.Wrapf(err, "failed to get tablet map for shard %v in keyspace %v: %v", shard, keyspace, err) } - // Check invariants we're going to depend on. - masterElectTabletAliasStr := topoproto.TabletAliasString(masterElectTabletAlias) - masterElectTabletInfo, ok := tabletMap[masterElectTabletAliasStr] - if !ok { - return fmt.Errorf("master-elect tablet %v is not in the shard", masterElectTabletAliasStr) + statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, ignoredTablets) + if err != nil { + return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) } - ev.NewMaster = *masterElectTabletInfo.Tablet - if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) { - return fmt.Errorf("master-elect tablet %v is already the master", topoproto.TabletAliasString(masterElectTabletAlias)) - } - - // Deal with the old master: try to remote-scrap it, if it's - // truly dead we force-scrap it. Remove it from our map in any case. - if shardInfo.HasMaster() { - deleteOldMaster := true - shardInfoMasterAliasStr := topoproto.TabletAliasString(shardInfo.MasterAlias) - oldMasterTabletInfo, ok := tabletMap[shardInfoMasterAliasStr] - if ok { - delete(tabletMap, shardInfoMasterAliasStr) - } else { - oldMasterTabletInfo, err = wr.ts.GetTablet(ctx, shardInfo.MasterAlias) - if err != nil { - wr.logger.Warningf("cannot read old master tablet %v, won't touch it: %v", shardInfoMasterAliasStr, err) - deleteOldMaster = false - } - } - - if deleteOldMaster { - ev.OldMaster = *oldMasterTabletInfo.Tablet - wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr) - - ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer cancel() - if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil { - wr.logger.Warningf("failed to delete old master tablet %v: %v", shardInfoMasterAliasStr, err) - } - } + // Check we still have the topology lock. + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } - // Stop replication on all replicas, get their current - // replication position - event.DispatchUpdate(ev, "stop replication on all replicas") - wg := sync.WaitGroup{} - mu := sync.Mutex{} - statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) - for alias, tabletInfo := range tabletMap { - wg.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wg.Done() - wr.logger.Infof("getting replication position from %v", alias) - ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer cancel() - // TODO: Once we refactor EmergencyReparent, change the stopReplicationOption argument to IOThreadOnly. - _, stopReplicationStatus, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOANDSQLTHREAD) - if err != nil { - wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) - return - } - mu.Lock() - statusMap[alias] = stopReplicationStatus - mu.Unlock() - }(alias, tabletInfo) + validCandidates, err := wr.findValidReparentCandidates(statusMap, masterStatusMap) + if err != nil { + return err + } + if len(validCandidates) == 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") } - wg.Wait() - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return fmt.Errorf("lost topology lock, aborting: %v", err) + errChan := make(chan error) + rec := &concurrency.AllErrorRecorder{} + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer groupCancel() + for candidate := range validCandidates { + go func(alias string) { + var err error + defer func() { errChan <- err }() + err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) + }(candidate) } - // Verify masterElect is alive and has the most advanced position - masterElectStatus, ok := statusMap[masterElectTabletAliasStr] - if !ok { - return fmt.Errorf("couldn't get master elect %v replication position", topoproto.TabletAliasString(masterElectTabletAlias)) + resultCounter := 0 + for waitErr := range errChan { + resultCounter++ + if waitErr != nil { + rec.RecordError(waitErr) + groupCancel() + } + if resultCounter == len(validCandidates) { + break + } } - masterElectStrPos := masterElectStatus.After.Position - masterElectPos, err := mysql.DecodePosition(masterElectStrPos) - if err != nil { - return fmt.Errorf("cannot decode master elect position %v: %v", masterElectStrPos, err) + if len(rec.Errors) != 0 { + return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided wait_replicas_timeout: %v", rec.Error()) } - for alias, status := range statusMap { - if alias == masterElectTabletAliasStr { + + var winningPosition mysql.Position + var newMasterTabletAliasStr string + for alias, position := range validCandidates { + if winningPosition.IsZero() { + winningPosition = position + newMasterTabletAliasStr = alias continue } - posStr := status.After.Position - pos, err := mysql.DecodePosition(posStr) - if err != nil { - return fmt.Errorf("cannot decode replica %v position %v: %v", alias, posStr, err) + if position.AtLeast(winningPosition) { + winningPosition = position + newMasterTabletAliasStr = alias } - if !masterElectPos.AtLeast(pos) { - return fmt.Errorf("tablet %v is more advanced than master elect tablet %v: %v > %v", alias, masterElectTabletAliasStr, posStr, masterElectStrPos) + } + + if masterElectTabletAlias != nil { + newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) + masterPos, ok := validCandidates[newMasterTabletAliasStr] + if !ok { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", newMasterTabletAliasStr) } + if !masterPos.AtLeast(winningPosition) { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect: %v at position %v, is not fully caught up. Winning position: %v", newMasterTabletAliasStr, masterPos, winningPosition) + } + } + + // Check we still have the topology lock. + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } // Promote the masterElect - wr.logger.Infof("promote tablet %v to master", topoproto.TabletAliasString(masterElectTabletAlias)) + wr.logger.Infof("promote tablet %v to master", newMasterTabletAliasStr) event.DispatchUpdate(ev, "promoting replica") - rp, err := wr.tmc.PromoteReplica(ctx, masterElectTabletInfo.Tablet) + rp, err := wr.tmc.PromoteReplica(ctx, tabletMap[newMasterTabletAliasStr].Tablet) if err != nil { - return fmt.Errorf("master-elect tablet %v failed to be upgraded to master: %v", topoproto.TabletAliasString(masterElectTabletAlias), err) + return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) } - // Check we stil have the topology lock. + // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return fmt.Errorf("lost topology lock, aborting: %v", err) + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } // Create a cancelable context for the following RPCs. // If error conditions happen, we can cancel all outgoing RPCs. - replCtx, replCancel := context.WithCancel(ctx) + replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer replCancel() // Reset replication on all replicas to point to the new master, and @@ -996,53 +983,206 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // - everybody else: reparent to new master, wait for row event.DispatchUpdate(ev, "reparenting all tablets") now := time.Now().UnixNano() - wgMaster := sync.WaitGroup{} - wgReplicas := sync.WaitGroup{} - rec := concurrency.AllErrorRecorder{} - var masterErr error + errChan = make(chan error) + + handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { + wr.logger.Infof("populating reparent journal on new master %v", alias) + return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) + } + handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { + var err error + defer func() { errChan <- err }() + + wr.logger.Infof("setting new master on replica %v", alias) + forceStart := false + if status, ok := statusMap[alias]; ok { + forceStart = replicaWasRunning(status) + } + err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) + } + } + for alias, tabletInfo := range tabletMap { - if alias == masterElectTabletAliasStr { - wgMaster.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wgMaster.Done() - wr.logger.Infof("populating reparent journal on new master %v", alias) - masterErr = wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, masterElectTabletAlias, rp) - }(alias, tabletInfo) - } else { - wgReplicas.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wgReplicas.Done() - wr.logger.Infof("setting new master on replica %v", alias) - forceStart := false - if status, ok := statusMap[alias]; ok { - forceStart = replicaWasRunning(status) - } - if err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, masterElectTabletAlias, now, "", forceStart); err != nil { - rec.RecordError(fmt.Errorf("tablet %v SetMaster failed: %v", alias, err)) - } - }(alias, tabletInfo) + if alias == newMasterTabletAliasStr { + continue + } else if !ignoredTablets.Has(alias) { + go handleReplica(alias, tabletInfo) } } - wgMaster.Wait() + masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) if masterErr != nil { - // The master failed, there is no way the - // replicas will work. So we cancel them all. - wr.logger.Warningf("master failed to PopulateReparentJournal, canceling replicas") + wr.logger.Warningf("master failed to PopulateReparentJournal") replCancel() - wgReplicas.Wait() - return fmt.Errorf("failed to PopulateReparentJournal on master: %v", masterErr) + return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr) } - // Wait for the replicas to complete. If some of them fail, we - // will rebuild the shard serving graph anyway - wgReplicas.Wait() - if err := rec.Error(); err != nil { - wr.Logger().Errorf2(err, "some replicas failed to reparent") - return err + return nil +} + +// waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded, +// the supplied cancel function will be called, and we will wait until N tablets return their errors, and then return an AllErrorRecorder to the caller. +func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, errorChannel chan error, acceptableErrCnt int) *concurrency.AllErrorRecorder { + errCounter := 0 + successCounter := 0 + responseCounter := 0 + rec := &concurrency.AllErrorRecorder{} + + for err := range errorChannel { + responseCounter++ + if err != nil { + errCounter++ + rec.RecordError(err) + } else { + successCounter++ + } + if responseCounter == tabletCount { + // We must wait for any cancelled goroutines to return their error. + break + } + if errCounter > acceptableErrCnt || successCounter == tabletCount-1 { + ctxCancel() + } } - return nil + return rec +} + +// findValidReparentCandidates will find valid candidates for emergency reparent, and if successful, returning them as a list of tablet aliases. +func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) (map[string]mysql.Position, error) { + // Build out replication status list from proto types. + replicationStatusMap := make(map[string]*mysql.ReplicationStatus, len(statusMap)) + for alias, protoStatus := range statusMap { + status := mysql.ProtoToReplicationStatus(protoStatus.After) + replicationStatusMap[alias] = &status + } + + // Determine if we need to find errant GTIDs. + var gtidBased *bool + for alias, status := range replicationStatusMap { + if gtidBased == nil { + _, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + gtidBased = pointer.BoolPtr(ok) + } else if !*gtidBased { + break + } else if status.RelayLogPosition.IsZero() { + // Bail. We have an odd one in the bunch. + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias) + } + } + + // Create relevant position list of errant GTID based positions for later comparison. + positionMap := make(map[string]mysql.Position) + for alias, status := range replicationStatusMap { + // Find errantGTIDs and clean them from status map if relevant. + if *gtidBased { + // We need to remove this status from a copy of the list, otherwise the diff will be empty always. + statusList := make([]*mysql.ReplicationStatus, 0, len(replicationStatusMap)-1) + for a, s := range replicationStatusMap { + if a != alias { + statusList = append(statusList, s) + } + } + relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + if !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "we got a filled in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assessment") + } + errantGTIDs, err := status.FindErrantGTIDs(statusList) + if err != nil { + // Could not find errant GTIDs when we must. + return nil, err + } + if len(errantGTIDs) != 0 { + // Skip inserting this tablet. It's not a valid candidate. + continue + } + + pos := mysql.Position{GTIDSet: relayLogGTIDSet} + positionMap[alias] = pos + } else { + positionMap[alias] = status.Position + } + } + + for alias, masterStatus := range masterStatusMap { + executedPosition, err := mysql.DecodePosition(masterStatus.Position) + if err != nil { + return nil, vterrors.Wrapf(err, "could not decode a master status executed position for tablet %v: %v", alias, err) + } + positionMap[alias] = executedPosition + } + + return positionMap, nil +} + +func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { + // Stop replication on all replicas, get their current + // replication position + event.DispatchUpdate(ev, "stop replication on all replicas") + statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) + masterStatusMap := make(map[string]*replicationdatapb.MasterStatus) + mu := sync.Mutex{} + + errChan := make(chan error) + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer groupCancel() + fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { + err := vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "fillStatus did not successfully complete") + defer func() { errChan <- err }() + + wr.logger.Infof("getting replication position from %v", alias) + var stopReplicationStatus *replicationdatapb.StopReplicationStatus + _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) + switch err { + case mysql.ErrNotReplica: + var masterStatus *replicationdatapb.MasterStatus + masterStatus, err = wr.tmc.DemoteMaster(groupCtx, tabletInfo.Tablet) + if err != nil { + wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) + err = vterrors.Wrapf(err, "replica %v thinks it's master but we failed to demote it: %v", alias, err) + return + } + mu.Lock() + masterStatusMap[alias] = masterStatus + mu.Unlock() + + case nil: + mu.Lock() + statusMap[alias] = stopReplicationStatus + mu.Unlock() + + default: + wr.logger.Warningf("failed to get replication status from %v: %v", alias, err) + err = vterrors.Wrapf(err, "error when getting replication status for alias %v: %v", alias, err) + } + } + + for alias, tabletInfo := range tabletMap { + if !ignoredTablets.Has(alias) { + go fillStatus(alias, tabletInfo) + } + } + + errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-ignoredTablets.Len(), errChan, 1) + + if len(errRecorder.Errors) > 1 { + return nil, nil, vterrors.Wrapf(errRecorder.Error(), "encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) + } + return statusMap, masterStatusMap, nil +} + +// WaitForRelayLogsToApply will block execution waiting for the given tablets relay logs to apply, unless the supplied +// context is cancelled, or waitReplicasTimeout is exceeded. +func (wr *Wrangler) WaitForRelayLogsToApply(ctx context.Context, tabletInfo *topo.TabletInfo, status *replicationdatapb.StopReplicationStatus) error { + var err error + if status.After.RelayLogPosition != "" { + err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.RelayLogPosition) + } else { + err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.FileRelayLogPosition) + } + return err } // TabletExternallyReparented changes the type of new master for this shard to MASTER diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index c6a7566011d..1216b19501a 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" @@ -54,6 +55,21 @@ func TestEmergencyReparentShard(t *testing.T) { goodReplica1 := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, nil) goodReplica2 := NewFakeTablet(t, wr, "cell2", 3, topodatapb.TabletType_REPLICA, nil) + oldMaster.FakeMysqlDaemon.Replicating = false + oldMaster.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{ + GTIDSet: mysql.MariadbGTIDSet{ + 2: mysql.MariadbGTID{ + Domain: 2, + Server: 123, + Sequence: 456, + }, + }, + } + currentMasterFilePosition, _ := mysql.ParseFilePosGTIDSet("mariadb-bin.000010:456") + oldMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: currentMasterFilePosition, + } + // new master newMaster.FakeMysqlDaemon.ReadOnly = true newMaster.FakeMysqlDaemon.Replicating = true @@ -66,8 +82,13 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + newMasterRelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:456") + newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: newMasterRelayLogPos, + } + newMaster.FakeMysqlDaemon.WaitMasterPosition = newMaster.FakeMysqlDaemon.CurrentMasterFilePosition newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", "CREATE DATABASE IF NOT EXISTS _vt", "SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal", "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES", @@ -86,6 +107,11 @@ func TestEmergencyReparentShard(t *testing.T) { // old master, will be scrapped oldMaster.FakeMysqlDaemon.ReadOnly = false + oldMaster.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica + oldMaster.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) + oldMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + "STOP SLAVE", + } oldMaster.StartActionLoop(t, wr) defer oldMaster.StopActionLoop(t) @@ -101,8 +127,14 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + goodReplica1RelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: goodReplica1RelayLogPos, + } + goodReplica1.FakeMysqlDaemon.WaitMasterPosition = goodReplica1.FakeMysqlDaemon.CurrentMasterFilePosition goodReplica1.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + "STOP SLAVE IO_THREAD", "STOP SLAVE", "FAKE SET MASTER", "START SLAVE", @@ -122,6 +154,11 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + goodReplica2RelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:454") + goodReplica2.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: goodReplica2RelayLogPos, + } + goodReplica2.FakeMysqlDaemon.WaitMasterPosition = goodReplica2.FakeMysqlDaemon.CurrentMasterFilePosition goodReplica2.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica2.StartActionLoop(t, wr) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -157,13 +194,15 @@ func TestEmergencyReparentShard(t *testing.T) { // TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent // to a host that is not the latest in replication position. func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + delay := discovery.GetTabletPickerRetryDelay() defer func() { discovery.SetTabletPickerRetryDelay(delay) }() discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) - ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -195,13 +234,19 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, }, } + newMasterRelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:456") + newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: newMasterRelayLogPos, + } + newMaster.FakeMysqlDaemon.WaitMasterPosition = newMaster.FakeMysqlDaemon.CurrentMasterFilePosition newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", } newMaster.StartActionLoop(t, wr) defer newMaster.StopActionLoop(t) // old master, will be scrapped + oldMaster.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica oldMaster.StartActionLoop(t, wr) defer oldMaster.StopActionLoop(t) @@ -227,16 +272,23 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, }, } + moreAdvancedReplicaLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:457") + moreAdvancedReplica.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: moreAdvancedReplicaLogPos, + } + moreAdvancedReplica.FakeMysqlDaemon.WaitMasterPosition = moreAdvancedReplica.FakeMysqlDaemon.CurrentMasterFilePosition moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", } moreAdvancedReplica.StartActionLoop(t, wr) defer moreAdvancedReplica.StopActionLoop(t) // run EmergencyReparentShard - err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second) + err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second, sets.NewString()) + cancel() + assert.Error(t, err) - assert.Contains(t, err.Error(), "is more advanced than master elect tablet") + assert.Contains(t, err.Error(), "is not fully caught up") // check what was run err = newMaster.FakeMysqlDaemon.CheckSuperQueryList() require.NoError(t, err)