From 1839e73151c111faca3be42b58d2a1316ab5f1f6 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 16 Jul 2020 15:22:24 -0700 Subject: [PATCH 01/26] Finished strategy steps 1-4 from main refactor issue. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 281 +++++++++++++++++++++++++++---------- 1 file changed, 208 insertions(+), 73 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 98ff80bed62..ab59b7a335f 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -24,8 +24,12 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/pointer" + "vitess.io/vitess/go/event" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqlescape" @@ -887,88 +891,32 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return fmt.Errorf("master-elect tablet %v is already the master", topoproto.TabletAliasString(masterElectTabletAlias)) } - // Deal with the old master: try to remote-scrap it, if it's - // truly dead we force-scrap it. Remove it from our map in any case. - if shardInfo.HasMaster() { - deleteOldMaster := true - shardInfoMasterAliasStr := topoproto.TabletAliasString(shardInfo.MasterAlias) - oldMasterTabletInfo, ok := tabletMap[shardInfoMasterAliasStr] - if ok { - delete(tabletMap, shardInfoMasterAliasStr) - } else { - oldMasterTabletInfo, err = wr.ts.GetTablet(ctx, shardInfo.MasterAlias) - if err != nil { - wr.logger.Warningf("cannot read old master tablet %v, won't touch it: %v", shardInfoMasterAliasStr, err) - deleteOldMaster = false - } - } - - if deleteOldMaster { - ev.OldMaster = *oldMasterTabletInfo.Tablet - wr.logger.Infof("deleting old master %v", shardInfoMasterAliasStr) - - ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer cancel() - - if err := topotools.DeleteTablet(ctx, wr.ts, oldMasterTabletInfo.Tablet); err != nil { - wr.logger.Warningf("failed to delete old master tablet %v: %v", shardInfoMasterAliasStr, err) - } - } - } - - // Stop replication on all replicas, get their current - // replication position - event.DispatchUpdate(ev, "stop replication on all replicas") - wg := sync.WaitGroup{} - mu := sync.Mutex{} - statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) - for alias, tabletInfo := range tabletMap { - wg.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wg.Done() - wr.logger.Infof("getting replication position from %v", alias) - ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer cancel() - // TODO: Once we refactor EmergencyReparent, change the stopReplicationOption argument to IOThreadOnly. - _, stopReplicationStatus, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOANDSQLTHREAD) - if err != nil { - wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) - return - } - mu.Lock() - statusMap[alias] = stopReplicationStatus - mu.Unlock() - }(alias, tabletInfo) + statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout) + if err != nil { + return err } - wg.Wait() // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { return fmt.Errorf("lost topology lock, aborting: %v", err) } - // Verify masterElect is alive and has the most advanced position - masterElectStatus, ok := statusMap[masterElectTabletAliasStr] - if !ok { - return fmt.Errorf("couldn't get master elect %v replication position", topoproto.TabletAliasString(masterElectTabletAlias)) - } - masterElectStrPos := masterElectStatus.After.Position - masterElectPos, err := mysql.DecodePosition(masterElectStrPos) + validCandidates, err := wr.findValidReparentCandidates(statusMap, masterStatusMap) if err != nil { - return fmt.Errorf("cannot decode master elect position %v: %v", masterElectStrPos, err) + return err } - for alias, status := range statusMap { - if alias == masterElectTabletAliasStr { - continue - } - posStr := status.After.Position - pos, err := mysql.DecodePosition(posStr) - if err != nil { - return fmt.Errorf("cannot decode replica %v position %v: %v", alias, posStr, err) - } - if !masterElectPos.AtLeast(pos) { - return fmt.Errorf("tablet %v is more advanced than master elect tablet %v: %v > %v", alias, masterElectTabletAliasStr, posStr, masterElectStrPos) - } + if len(validCandidates) == 0 { + return fmt.Errorf("no valid candidates for emergency reparent") + } + validCandidatesSet := sets.NewString(validCandidates...) + if !validCandidatesSet.Has(masterElectTabletAliasStr) { + return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") + } + + // Wait for masterElect to catch up. + err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, waitReplicasTimeout) + if err != nil { + return err } // Promote the masterElect @@ -1045,6 +993,193 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return nil } +// findValidReparentCandidates will find valid candidates for emergency reparent, and if successful, returning them as a list of tablet aliases. +func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) ([]string, error) { + // Build out replication status list from proto types. + replicationStatusMap := make(map[string]*mysql.ReplicationStatus, len(statusMap)) + for alias, protoStatus := range statusMap { + status := mysql.ProtoToReplicationStatus(protoStatus.After) + replicationStatusMap[alias] = &status + } + + // Determine if we need to find errant GTIDs. + var gtidBased *bool + for _, status := range replicationStatusMap { + if gtidBased == nil { + gtidBased = pointer.BoolPtr(!status.RelayLogPosition.IsZero()) + } else if !*gtidBased { + break + } else if status.RelayLogPosition.IsZero() { + // Bail. We have an odd one in the bunch. + return nil, fmt.Errorf("encountered tablet with no relay log position, when at least one other tablet in the status map has a relay log positions") + } + } + + // Create relevant position list for comparison. + positionMap := make(map[string]mysql.Position) + badTablets := sets.NewString() + for alias, status := range replicationStatusMap { + // Find errantGTIDs and clean them from status map if relevant. + if *gtidBased { + // We need to remove this status from a copy of the list, otherwise the diff will be empty always. + statusList := make([]*mysql.ReplicationStatus, 0, len(replicationStatusMap)-1) + for a, s := range replicationStatusMap { + if a != alias { + statusList = append(statusList, s) + } + } + errantGTIDs, err := status.FindErrantGTIDs(statusList) + if err != nil { + // Could not find errant GTIDs when we must. + return nil, err + } + // We'll keep track of bad replicas. Errant free these can still be used to determine high water mark + // but should not be considered for emergency reparent. + if len(errantGTIDs) != 0 { + badTablets.Insert(alias) + } + + relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + if !ok { + return nil, fmt.Errorf("we got a filled in relay log position for a tablet, even though the GTIDSet is not of type Mysql56GTIDSet. We don't know how this could happen") + } + cleanedGTIDSet := relayLogGTIDSet.Difference(errantGTIDs) + cleanedRelayLogPosition := mysql.Position{GTIDSet: cleanedGTIDSet} + positionMap[alias] = cleanedRelayLogPosition + } else { + positionMap[alias] = status.FileRelayLogPosition + } + } + + for alias, masterStatus := range masterStatusMap { + executedPosition, err := mysql.DecodePosition(masterStatus.Position) + if err != nil { + return nil, err + } + positionMap[alias] = executedPosition + } + + if !*gtidBased { + // If all positions are not comparable, we can't do file based comparisons at all. + filePositionList := make([]mysql.Position, 0, len(replicationStatusMap)) + for _, p := range positionMap { + filePositionList = append(filePositionList, p) + } + if !mysql.AllPositionsComparable(filePositionList) { + return nil, fmt.Errorf("we can't compare all file based positions") + } + } + + // Find a position that is fully caught up when errant GTIDs are taken out of the equation. + var winningPosition mysql.Position + for _, position := range positionMap { + if position.AtLeast(winningPosition) { + winningPosition = position + } + } + + // Now that we have a high water mark, we can get a list of all tablets that are errant GTID free + // that hit this high water mark. + var validCandidates []string + for alias, position := range positionMap { + if badTablets.Has(alias) { + // We should exclude tablets we've already determined have errant GTIDs from consideration. + continue + } + + if position.AtLeast(winningPosition) { + validCandidates = append(validCandidates, alias) + } + } + + return validCandidates, nil +} + +func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { + // Stop replication on all replicas, get their current + // replication position + wg := sync.WaitGroup{} + mu := sync.Mutex{} + var errCounter uint32 + + event.DispatchUpdate(ev, "stop replication on all replicas") + statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) + masterStatusMap := make(map[string]*replicationdatapb.MasterStatus) + + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer groupCancel() + for alias, tabletInfo := range tabletMap { + wg.Add(1) + go func(alias string, tabletInfo *topo.TabletInfo) { + defer wg.Done() + wr.logger.Infof("getting replication position from %v", alias) + ctx, cancel := context.WithTimeout(groupCtx, waitReplicasTimeout) + defer cancel() + _, stopReplicationStatus, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) + if err != nil { + if err != mysql.ErrNotReplica { + wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) + atomic.AddUint32(&errCounter, 1) + if atomic.LoadUint32(&errCounter) > 1 { + groupCancel() + } + return + } + + masterStatus, err := wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) + if err != nil { + wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) + atomic.AddUint32(&errCounter, 1) + if atomic.LoadUint32(&errCounter) > 1 { + groupCancel() + } + return + } + masterStatusMap[alias] = masterStatus + } + + mu.Lock() + statusMap[alias] = stopReplicationStatus + mu.Unlock() + }(alias, tabletInfo) + } + wg.Wait() + if errCounter > 1 { + return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions") + } + if (len(statusMap) + len(masterStatusMap)) != len(tabletMap)-1 { + return nil, nil, fmt.Errorf("did not hear back from all replicas when stopping replication to build status map") + } + return statusMap, masterStatusMap, nil +} + +// WaitForRelayLogsToApply will block execution waiting for the given tablets relay logs to apply, unless the supplied +// context is cancelled, or waitReplicasTimeout is exceeded. +func (wr *Wrangler) WaitForRelayLogsToApply(ctx context.Context, tabletInfo *topo.TabletInfo, waitReplicasTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer cancel() + for { + status, err := wr.tmc.ReplicationStatus(ctx, tabletInfo.Tablet) + if err != nil { + if err == mysql.ErrNotReplica { + // This tablet thinks it's not a replica, so we have nothing to wait on as far as applying + // relay logs. + return nil + } + return err + } + replicationStatus := mysql.ProtoToReplicationStatus(status) + if !replicationStatus.RelayLogPosition.IsZero() && replicationStatus.Position.AtLeast(replicationStatus.RelayLogPosition) { + return nil + } + if replicationStatus.RelayLogPosition.IsZero() && replicationStatus.FilePosition.AtLeast(replicationStatus.FileRelayLogPosition) { + return nil + } + + time.Sleep(2 * time.Second) + } +} + // TabletExternallyReparented changes the type of new master for this shard to MASTER // and updates it's tablet record in the topo. Updating the shard record is handled // by the new master tablet From 35c85a6a16d681c0a65f014ada310d87d1c9b1b8 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 16 Jul 2020 15:42:00 -0700 Subject: [PATCH 02/26] Updated test for new io_thread only logic. Signed-off-by: Peter Farr --- go/vt/wrangler/testlib/emergency_reparent_shard_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index c24c1dbe6eb..c83fb1903f2 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -59,7 +59,7 @@ func TestEmergencyReparentShard(t *testing.T) { }, } newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", "CREATE DATABASE IF NOT EXISTS _vt", "SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal", "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES", @@ -95,7 +95,7 @@ func TestEmergencyReparentShard(t *testing.T) { } goodReplica1.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", "FAKE SET MASTER", "START SLAVE", } @@ -182,7 +182,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, } newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", } newMaster.StartActionLoop(t, wr) defer newMaster.StopActionLoop(t) @@ -214,7 +214,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, } moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ - "STOP SLAVE", + "STOP SLAVE IO_THREAD", } moreAdvancedReplica.StartActionLoop(t, wr) defer moreAdvancedReplica.StopActionLoop(t) From 62950ff27ab1955f334fbd82b30fbfe8473fb308 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 16 Jul 2020 16:14:11 -0700 Subject: [PATCH 03/26] Some tests are failing because master is not actually stopped. We should ensure that in this case getting N responses is ok, just not less than N-1 Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index ab59b7a335f..ec289b48eb4 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1147,8 +1147,8 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e if errCounter > 1 { return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions") } - if (len(statusMap) + len(masterStatusMap)) != len(tabletMap)-1 { - return nil, nil, fmt.Errorf("did not hear back from all replicas when stopping replication to build status map") + if (len(statusMap) + len(masterStatusMap)) < len(tabletMap)-1 { + return nil, nil, fmt.Errorf("did not hear back from enough replicas when stopping replication to build status map") } return statusMap, masterStatusMap, nil } From 70f7273f39737e0b171debf39cc7f2c9e1785ee1 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Fri, 17 Jul 2020 15:09:01 -0700 Subject: [PATCH 04/26] Finally passing. Hallelujah. Signed-off-by: Peter Farr --- go/mysql/filepos_gtid.go | 6 +-- .../fakemysqldaemon/fakemysqldaemon.go | 12 +++-- go/vt/wrangler/reparent.go | 53 ++++++++++++------- .../testlib/emergency_reparent_shard_test.go | 46 +++++++++++++++- 4 files changed, 90 insertions(+), 27 deletions(-) diff --git a/go/mysql/filepos_gtid.go b/go/mysql/filepos_gtid.go index 8a276f0a018..eb55bc33e3a 100644 --- a/go/mysql/filepos_gtid.go +++ b/go/mysql/filepos_gtid.go @@ -44,8 +44,8 @@ func parseFilePosGTID(s string) (GTID, error) { }, nil } -// parseFilePosGTIDSet is registered as a GTIDSet parser. -func parseFilePosGTIDSet(s string) (GTIDSet, error) { +// ParseFilePosGTIDSet is registered as a GTIDSet parser. +func ParseFilePosGTIDSet(s string) (GTIDSet, error) { gtid, err := parseFilePosGTID(s) if err != nil { return nil, err @@ -156,6 +156,6 @@ func (gtid filePosGTID) Last() string { func init() { gtidParsers[FilePosFlavorID] = parseFilePosGTID - gtidSetParsers[FilePosFlavorID] = parseFilePosGTIDSet + gtidSetParsers[FilePosFlavorID] = ParseFilePosGTIDSet flavors[FilePosFlavorID] = newFilePosFlavor } diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go index 7707ac63507..f4142ffee20 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go @@ -65,6 +65,9 @@ type FakeMysqlDaemon struct { // and ReplicationStatus CurrentMasterPosition mysql.Position + // CurrentMasterFilePosition is used to determine the executed file based positioning of the master. + CurrentMasterFilePosition mysql.Position + // ReplicationStatusError is used by ReplicationStatus ReplicationStatusError error @@ -225,8 +228,10 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error) return mysql.ReplicationStatus{}, fmd.ReplicationStatusError } return mysql.ReplicationStatus{ - Position: fmd.CurrentMasterPosition, - SecondsBehindMaster: fmd.SecondsBehindMaster, + Position: fmd.CurrentMasterPosition, + FilePosition: fmd.CurrentMasterFilePosition, + FileRelayLogPosition: fmd.CurrentMasterFilePosition, + SecondsBehindMaster: fmd.SecondsBehindMaster, // implemented as AND to avoid changing all tests that were // previously using Replicating = false IOThreadRunning: fmd.Replicating && fmd.IOThreadRunning, @@ -242,7 +247,8 @@ func (fmd *FakeMysqlDaemon) MasterStatus(ctx context.Context) (mysql.MasterStatu return mysql.MasterStatus{}, fmd.MasterStatusError } return mysql.MasterStatus{ - Position: fmd.CurrentMasterPosition, + Position: fmd.CurrentMasterPosition, + FilePosition: fmd.CurrentMasterFilePosition, }, nil } diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index ec289b48eb4..8054f61bf33 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1004,14 +1004,14 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio // Determine if we need to find errant GTIDs. var gtidBased *bool - for _, status := range replicationStatusMap { + for alias, status := range replicationStatusMap { if gtidBased == nil { gtidBased = pointer.BoolPtr(!status.RelayLogPosition.IsZero()) } else if !*gtidBased { break } else if status.RelayLogPosition.IsZero() { // Bail. We have an odd one in the bunch. - return nil, fmt.Errorf("encountered tablet with no relay log position, when at least one other tablet in the status map has a relay log positions") + return nil, fmt.Errorf("encountered tablet %v with no relay log position, when at least one other tablet in the status map has a relay log positions", alias) } } @@ -1052,11 +1052,19 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio } for alias, masterStatus := range masterStatusMap { - executedPosition, err := mysql.DecodePosition(masterStatus.Position) - if err != nil { - return nil, err + if *gtidBased { + executedPosition, err := mysql.DecodePosition(masterStatus.Position) + if err != nil { + return nil, err + } + positionMap[alias] = executedPosition + } else { + executedFilePosition, err := mysql.DecodePosition(masterStatus.FilePosition) + if err != nil { + return nil, err + } + positionMap[alias] = executedFilePosition } - positionMap[alias] = executedPosition } if !*gtidBased { @@ -1073,6 +1081,9 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio // Find a position that is fully caught up when errant GTIDs are taken out of the equation. var winningPosition mysql.Position for _, position := range positionMap { + if winningPosition.IsZero() { + winningPosition = position + } if position.AtLeast(winningPosition) { winningPosition = position } @@ -1116,16 +1127,8 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e ctx, cancel := context.WithTimeout(groupCtx, waitReplicasTimeout) defer cancel() _, stopReplicationStatus, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) - if err != nil { - if err != mysql.ErrNotReplica { - wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) - atomic.AddUint32(&errCounter, 1) - if atomic.LoadUint32(&errCounter) > 1 { - groupCancel() - } - return - } - + switch err { + case mysql.ErrNotReplica: masterStatus, err := wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) if err != nil { wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) @@ -1135,12 +1138,22 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e } return } + mu.Lock() masterStatusMap[alias] = masterStatus + mu.Unlock() + + case nil: + mu.Lock() + statusMap[alias] = stopReplicationStatus + mu.Unlock() + + default: + wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) + atomic.AddUint32(&errCounter, 1) + if atomic.LoadUint32(&errCounter) > 1 { + groupCancel() + } } - - mu.Lock() - statusMap[alias] = stopReplicationStatus - mu.Unlock() }(alias, tabletInfo) } wg.Wait() diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index c83fb1903f2..fd01253c8cd 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -46,6 +46,21 @@ func TestEmergencyReparentShard(t *testing.T) { goodReplica1 := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, nil) goodReplica2 := NewFakeTablet(t, wr, "cell2", 3, topodatapb.TabletType_REPLICA, nil) + oldMaster.FakeMysqlDaemon.Replicating = false + oldMaster.FakeMysqlDaemon.CurrentMasterPosition = mysql.Position{ + GTIDSet: mysql.MariadbGTIDSet{ + 2: mysql.MariadbGTID{ + Domain: 2, + Server: 123, + Sequence: 456, + }, + }, + } + currentMasterFilePosition, _ := mysql.ParseFilePosGTIDSet("mariadb-bin.000010:456") + oldMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: currentMasterFilePosition, + } + // new master newMaster.FakeMysqlDaemon.ReadOnly = true newMaster.FakeMysqlDaemon.Replicating = true @@ -58,6 +73,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + newMasterRelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:456") + newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: newMasterRelayLogPos, + } newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", "CREATE DATABASE IF NOT EXISTS _vt", @@ -78,6 +97,13 @@ func TestEmergencyReparentShard(t *testing.T) { // old master, will be scrapped oldMaster.FakeMysqlDaemon.ReadOnly = false + oldMaster.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica + oldMaster.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) + oldMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + "STOP SLAVE", + "FAKE SET MASTER", + "START SLAVE", + } oldMaster.StartActionLoop(t, wr) defer oldMaster.StopActionLoop(t) @@ -93,9 +119,14 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + goodReplica1RelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: goodReplica1RelayLogPos, + } goodReplica1.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", + "STOP SLAVE", "FAKE SET MASTER", "START SLAVE", } @@ -114,6 +145,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, } + goodReplica2RelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:454") + goodReplica2.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: goodReplica2RelayLogPos, + } goodReplica2.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica2.StartActionLoop(t, wr) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -181,6 +216,10 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, }, } + newMasterRelayLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:456") + newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: newMasterRelayLogPos, + } newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", } @@ -188,6 +227,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { defer newMaster.StopActionLoop(t) // old master, will be scrapped + oldMaster.FakeMysqlDaemon.ReplicationStatusError = mysql.ErrNotReplica oldMaster.StartActionLoop(t, wr) defer oldMaster.StopActionLoop(t) @@ -213,6 +253,10 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { }, }, } + moreAdvancedReplicaLogPos, _ := mysql.ParseFilePosGTIDSet("relay-bin.000004:457") + moreAdvancedReplica.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ + GTIDSet: moreAdvancedReplicaLogPos, + } moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", } @@ -222,7 +266,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { // run EmergencyReparentShard err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second) assert.Error(t, err) - assert.Contains(t, err.Error(), "is more advanced than master elect tablet") + assert.Contains(t, err.Error(), "master elect is either not fully caught up, or") // check what was run err = newMaster.FakeMysqlDaemon.CheckSuperQueryList() require.NoError(t, err) From 2af7875d216f1f7e36466116b10676d95608a589 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Mon, 20 Jul 2020 11:21:05 -0700 Subject: [PATCH 05/26] Finished all major aspects of design, other than --ignore_unreachable_tablets flag, which I would like some discussion around. Signed-off-by: Peter Farr --- go.mod | 1 + go/test/endtoend/reparent/reparent_test.go | 147 ++++++++- go/vt/vtctl/reparent.go | 13 +- go/vt/wrangler/reparent.go | 311 +++++++++++------- .../testlib/emergency_reparent_shard_test.go | 1 + 5 files changed, 346 insertions(+), 127 deletions(-) diff --git a/go.mod b/go.mod index 1ff58dc17cc..327920585cd 100644 --- a/go.mod +++ b/go.mod @@ -96,5 +96,6 @@ require ( k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery v0.17.3 k8s.io/client-go v0.17.3 + k8s.io/utils v0.0.0-20191114184206-e782cd3c129f sigs.k8s.io/yaml v1.1.0 ) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 33d3bd8a9bb..f895e048c96 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" querypb "vitess.io/vitess/go/vt/proto/query" @@ -119,9 +119,22 @@ func TestReparentDownMaster(t *testing.T) { "EmergencyReparentShard", "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias, - "-wait_replicas_timeout", "31s") + "-wait_replicas_timeout", "30s") require.NoError(t, err) + require.Nil(t, err) + // Check that old master tablet is left around for human intervention. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + require.Error(t, err) + + // Now we'll manually remove it, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.Nil(t, err) + + // Now validate topo is correct. validateTopology(t, false) checkMasterTablet(t, tablet62044) @@ -152,6 +165,111 @@ func TestReparentDownMaster(t *testing.T) { killTablets(t) } +func TestReparentNoChoiceDownMaster(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + // Create Database + err := tablet.VttabletProcess.CreateDB(keyspaceName) + require.Nil(t, err) + + // Reset status, don't wait for the tablet status. We will check it later + tablet.VttabletProcess.ServingStatus = "" + // Init Tablet + err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // Start the tablet + err = tablet.VttabletProcess.Setup() + require.Nil(t, err) + } + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) + require.Nil(t, err) + } + + // Init Shard Master + err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", + "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) + require.Nil(t, err) + + validateTopology(t, true) + + // create Tables + runSQL(ctx, t, sqlSchema, tablet62344) + + // insert data into the old master, check the connected replica work + insertSQL1 := fmt.Sprintf(insertSQL, 2, 2) + runSQL(ctx, t, insertSQL1, tablet62344) + err = checkInsertedValues(ctx, t, tablet62044, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet41983, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet31981, 2) + require.Nil(t, err) + + // Make the current master agent and database unavailable. + err = tablet62344.VttabletProcess.TearDown() + require.Nil(t, err) + err = tablet62344.MysqlctlProcess.Stop() + require.Nil(t, err) + + // Run forced reparent operation, this should now proceed unimpeded. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-wait_replicas_timeout", "30s") + require.Nil(t, err) + + // Check that old master tablet is left around for human intervention. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + require.Error(t, err) + + // Now we'll manually remove the old master, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.Nil(t, err) + + // Now validate topo is correct. + validateTopology(t, false) + + var newMasterTablet *cluster.Vttablet + for _, tablet := range []*cluster.Vttablet{tablet62044, tablet41983, tablet31981} { + if isHealthyMasterTablet(t, tablet) { + newMasterTablet = tablet + break + } + } + require.NotNil(t, newMasterTablet) + + // Check new master has latest transaction. + err = checkInsertedValues(ctx, t, newMasterTablet, 2) + require.Nil(t, err) + + // bring back the old master as a replica, check that it catches up + tablet62344.MysqlctlProcess.InitMysql = false + err = tablet62344.MysqlctlProcess.Start() + require.Nil(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // As there is already a master the new replica will come directly in SERVING state + tablet62344.VttabletProcess.ServingStatus = "SERVING" + // Start the tablet + err = tablet62344.VttabletProcess.Setup() + require.Nil(t, err) + + err = checkInsertedValues(ctx, t, tablet62344, 2) + require.Nil(t, err) + + // Kill tablets + killTablets(t) +} + func TestReparentCrossCell(t *testing.T) { defer cluster.PanicHandler(t) @@ -820,7 +938,30 @@ func checkMasterTablet(t *testing.T, tablet *cluster.Vttablet) { assert.True(t, streamHealthResponse.GetServing()) tabletType := streamHealthResponse.GetTarget().GetTabletType() assert.Equal(t, topodatapb.TabletType_MASTER, tabletType) +} +// isHealthyMasterTablet will return if tablet is master AND healthy +func isHealthyMasterTablet(t *testing.T, tablet *cluster.Vttablet) bool { + result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", tablet.Alias) + require.Nil(t, err) + var tabletInfo topodatapb.Tablet + err = json2.Unmarshal([]byte(result), &tabletInfo) + require.Nil(t, err) + if tabletInfo.GetType() != topodatapb.TabletType_MASTER { + return false + } + + // make sure the health stream is updated + result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", tablet.Alias) + require.Nil(t, err) + var streamHealthResponse querypb.StreamHealthResponse + + err = json2.Unmarshal([]byte(result), &streamHealthResponse) + require.Nil(t, err) + + assert.True(t, streamHealthResponse.GetServing()) + tabletType := streamHealthResponse.GetTarget().GetTabletType() + return tabletType == topodatapb.TabletType_MASTER } func checkInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, index int) error { diff --git a/go/vt/vtctl/reparent.go b/go/vt/vtctl/reparent.go index dc56e9b898a..c1db88b3710 100644 --- a/go/vt/vtctl/reparent.go +++ b/go/vt/vtctl/reparent.go @@ -49,7 +49,7 @@ func init() { addCommand("Shards", command{ "EmergencyReparentShard", commandEmergencyReparentShard, - "-keyspace_shard= -new_master= [-wait_replicas_timeout=]", + "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=]", "Reparents the shard to the new master. Assumes the old master is dead and not responsding."}) addCommand("Shards", command{ "TabletExternallyReparented", @@ -178,16 +178,19 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s *keyspaceShard = subFlags.Arg(0) *newMaster = subFlags.Arg(1) } else if subFlags.NArg() != 0 { - return fmt.Errorf("action EmergencyReparentShard requires -keyspace_shard= -new_master=") + return fmt.Errorf("action EmergencyReparentShard requires -keyspace_shard=") } keyspace, shard, err := topoproto.ParseKeyspaceShard(*keyspaceShard) if err != nil { return err } - tabletAlias, err := topoproto.ParseTabletAlias(*newMaster) - if err != nil { - return err + var tabletAlias *topodatapb.TabletAlias + if *newMaster != "" { + tabletAlias, err = topoproto.ParseTabletAlias(*newMaster) + if err != nil { + return err + } } return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout) } diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 8054f61bf33..749c14f1381 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -24,7 +24,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" "k8s.io/apimachinery/pkg/util/sets" @@ -848,7 +847,11 @@ func (wr *Wrangler) chooseNewMaster( // the shard, when the old master is completely unreachable. func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) (err error) { // lock the shard - ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, fmt.Sprintf("EmergencyReparentShard(%v)", topoproto.TabletAliasString(masterElectTabletAlias))) + actionMsg := "EmergencyReparentShard" + if masterElectTabletAlias != nil { + actionMsg = fmt.Sprintf("%v(%v)", actionMsg, topoproto.TabletAliasString(masterElectTabletAlias)) + } + ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, actionMsg) if lockErr != nil { return lockErr } @@ -880,22 +883,15 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return err } - // Check invariants we're going to depend on. - masterElectTabletAliasStr := topoproto.TabletAliasString(masterElectTabletAlias) - masterElectTabletInfo, ok := tabletMap[masterElectTabletAliasStr] - if !ok { - return fmt.Errorf("master-elect tablet %v is not in the shard", masterElectTabletAliasStr) - } - ev.NewMaster = *masterElectTabletInfo.Tablet - if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) { - return fmt.Errorf("master-elect tablet %v is already the master", topoproto.TabletAliasString(masterElectTabletAlias)) - } - statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout) if err != nil { return err } + if len(statusMap)+len(masterStatusMap) < len(tabletMap)-1 { + return fmt.Errorf("didn't get enough tablet responses to find a candidate garaunteed to have newest transaction") + } + // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { return fmt.Errorf("lost topology lock, aborting: %v", err) @@ -908,26 +904,81 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events if len(validCandidates) == 0 { return fmt.Errorf("no valid candidates for emergency reparent") } - validCandidatesSet := sets.NewString(validCandidates...) - if !validCandidatesSet.Has(masterElectTabletAliasStr) { - return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") + + var newMasterTabletAliasStr string + if masterElectTabletAlias != nil { + masterElectTabletAliasStr := topoproto.TabletAliasString(masterElectTabletAlias) + masterElectTabletInfo, ok := tabletMap[masterElectTabletAliasStr] + if !ok { + return fmt.Errorf("master-elect tablet %v is not in the shard", masterElectTabletAliasStr) + } + ev.NewMaster = *masterElectTabletInfo.Tablet + if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) { + return fmt.Errorf("master-elect tablet %v is already the master", topoproto.TabletAliasString(masterElectTabletAlias)) + } + validCandidatesSet := sets.NewString(validCandidates...) + if !validCandidatesSet.Has(masterElectTabletAliasStr) { + return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") + } + + // Wait for masterElect to catch up. + err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, statusMap[masterElectTabletAliasStr]) + if err != nil { + return err + } + newMasterTabletAliasStr = masterElectTabletAliasStr + } else { + // race. + aliasChan := make(chan string) + groupCtx, groupCancel := context.WithCancel(ctx) + defer groupCancel() + for _, candidate := range validCandidates { + go func(alias string) { + var resultAlias string + done := func() { aliasChan <- resultAlias } + defer done() + ctx, cancel := context.WithCancel(groupCtx) + defer cancel() + + err = wr.WaitForRelayLogsToApply(ctx, tabletMap[alias], statusMap[alias]) + if err == nil { + resultAlias = alias + } + }(candidate) + } + + resultCounter := 0 + var winningTabletAlias string + for alias := range aliasChan { + resultCounter++ + if alias != "" { + winningTabletAlias = alias + groupCancel() + } + if resultCounter == len(validCandidates) { + break + } + } + if winningTabletAlias == "" { + return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout") + } + newMasterTabletAliasStr = winningTabletAlias } - // Wait for masterElect to catch up. - err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, waitReplicasTimeout) - if err != nil { - return err + // Check we still have the topology lock. + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return fmt.Errorf("lost topology lock, aborting: %v", err) } // Promote the masterElect - wr.logger.Infof("promote tablet %v to master", topoproto.TabletAliasString(masterElectTabletAlias)) + wr.logger.Infof("promote tablet %v to master", newMasterTabletAliasStr) event.DispatchUpdate(ev, "promoting replica") - rp, err := wr.tmc.PromoteReplica(ctx, masterElectTabletInfo.Tablet) + rp, err := wr.tmc.PromoteReplica(ctx, tabletMap[newMasterTabletAliasStr].Tablet) if err != nil { - return fmt.Errorf("master-elect tablet %v failed to be upgraded to master: %v", topoproto.TabletAliasString(masterElectTabletAlias), err) + return fmt.Errorf("master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) } - // Check we stil have the topology lock. + // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { return fmt.Errorf("lost topology lock, aborting: %v", err) } @@ -944,55 +995,96 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // - everybody else: reparent to new master, wait for row event.DispatchUpdate(ev, "reparenting all tablets") now := time.Now().UnixNano() + errChan := make(chan error) wgMaster := sync.WaitGroup{} - wgReplicas := sync.WaitGroup{} - rec := concurrency.AllErrorRecorder{} var masterErr error + + handleMaster := func(alias string, tabletInfo *topo.TabletInfo) { + err := fmt.Errorf("handleMaster never finished execution for alias: %v", alias) + sendErr := func() { errChan <- err } + defer sendErr() + defer wgMaster.Done() + + wr.logger.Infof("populating reparent journal on new master %v", alias) + err = wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) + if err != nil { + masterErr = err + } + } + handleReplicas := func(alias string, tabletInfo *topo.TabletInfo) { + var err error + sendErr := func() { errChan <- err } + defer sendErr() + + wr.logger.Infof("setting new master on replica %v", alias) + forceStart := false + if status, ok := statusMap[alias]; ok { + forceStart = replicaWasRunning(status) + } + err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) + if err != nil { + err = fmt.Errorf("tablet %v SetMaster failed: %v", alias, err) + } + } + waitOnTablets := func() *concurrency.AllErrorRecorder { + return waitOnNMinusOneTablets(replCancel, len(tabletMap), errChan) + } + for alias, tabletInfo := range tabletMap { - if alias == masterElectTabletAliasStr { + if alias == newMasterTabletAliasStr { wgMaster.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wgMaster.Done() - wr.logger.Infof("populating reparent journal on new master %v", alias) - masterErr = wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, masterElectTabletAlias, rp) - }(alias, tabletInfo) + go handleMaster(alias, tabletInfo) } else { - wgReplicas.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wgReplicas.Done() - wr.logger.Infof("setting new master on replica %v", alias) - forceStart := false - if status, ok := statusMap[alias]; ok { - forceStart = replicaWasRunning(status) - } - if err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, masterElectTabletAlias, now, "", forceStart); err != nil { - rec.RecordError(fmt.Errorf("tablet %v SetMaster failed: %v", alias, err)) - } - }(alias, tabletInfo) + go handleReplicas(alias, tabletInfo) } } wgMaster.Wait() if masterErr != nil { - // The master failed, there is no way the - // replicas will work. So we cancel them all. - wr.logger.Warningf("master failed to PopulateReparentJournal, canceling replicas") + wr.logger.Warningf("master failed to PopulateReparentJournal") replCancel() - wgReplicas.Wait() + waitOnTablets() return fmt.Errorf("failed to PopulateReparentJournal on master: %v", masterErr) } - // Wait for the replicas to complete. If some of them fail, we - // will rebuild the shard serving graph anyway - wgReplicas.Wait() - if err := rec.Error(); err != nil { - wr.Logger().Errorf2(err, "some replicas failed to reparent") + errorRecorder := waitOnTablets() + if len(errorRecorder.Errors) > 1 { + // We allow one error, for the presumed non-contactable dead master tablet. + wr.Logger().Errorf2(errorRecorder.Error(), "some replicas failed to reparent") return err } return nil } +// waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded, +// the supplied cancel function will be called, and we will wait until N tablets return their errors, and then return an AllErrorRecorder to the caller. +func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, errorChannel chan error) *concurrency.AllErrorRecorder { + errCounter := 0 + successCounter := 0 + responseCounter := 0 + rec := &concurrency.AllErrorRecorder{} + + for err := range errorChannel { + responseCounter++ + successCounter++ + if err != nil { + errCounter++ + successCounter-- + rec.RecordError(err) + } + if responseCounter == tabletCount { + // We must wait for any cancelled goroutines to return their error. + break + } + if errCounter > 1 || successCounter == tabletCount-1 { + ctxCancel() + } + } + + return rec +} + // findValidReparentCandidates will find valid candidates for emergency reparent, and if successful, returning them as a list of tablet aliases. func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) ([]string, error) { // Build out replication status list from proto types. @@ -1109,88 +1201,69 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { // Stop replication on all replicas, get their current // replication position - wg := sync.WaitGroup{} - mu := sync.Mutex{} - var errCounter uint32 - event.DispatchUpdate(ev, "stop replication on all replicas") statusMap := make(map[string]*replicationdatapb.StopReplicationStatus) masterStatusMap := make(map[string]*replicationdatapb.MasterStatus) + mu := sync.Mutex{} + errChan := make(chan error) groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer groupCancel() - for alias, tabletInfo := range tabletMap { - wg.Add(1) - go func(alias string, tabletInfo *topo.TabletInfo) { - defer wg.Done() - wr.logger.Infof("getting replication position from %v", alias) - ctx, cancel := context.WithTimeout(groupCtx, waitReplicasTimeout) - defer cancel() - _, stopReplicationStatus, err := wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) - switch err { - case mysql.ErrNotReplica: - masterStatus, err := wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) - if err != nil { - wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) - atomic.AddUint32(&errCounter, 1) - if atomic.LoadUint32(&errCounter) > 1 { - groupCancel() - } - return - } - mu.Lock() - masterStatusMap[alias] = masterStatus - mu.Unlock() - - case nil: - mu.Lock() - statusMap[alias] = stopReplicationStatus - mu.Unlock() - - default: - wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) - atomic.AddUint32(&errCounter, 1) - if atomic.LoadUint32(&errCounter) > 1 { - groupCancel() - } + fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { + err := fmt.Errorf("fillStatus did not successfully complete") + returnErr := func() { errChan <- err } + defer returnErr() + + wr.logger.Infof("getting replication position from %v", alias) + ctx, cancel := context.WithCancel(groupCtx) + defer cancel() + var stopReplicationStatus *replicationdatapb.StopReplicationStatus + _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) + switch err { + case mysql.ErrNotReplica: + fmt.Printf("Found ErrNotReplica for alias: %v", alias) + masterStatus, err := wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) + if err != nil { + wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) + return } - }(alias, tabletInfo) + mu.Lock() + masterStatusMap[alias] = masterStatus + mu.Unlock() + + case nil: + mu.Lock() + statusMap[alias] = stopReplicationStatus + mu.Unlock() + + default: + wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) + err = fmt.Errorf("error when getting replication status for alias %v: %v", alias, err) + } } - wg.Wait() - if errCounter > 1 { - return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions") + + for alias, tabletInfo := range tabletMap { + go fillStatus(alias, tabletInfo) } - if (len(statusMap) + len(masterStatusMap)) < len(tabletMap)-1 { - return nil, nil, fmt.Errorf("did not hear back from enough replicas when stopping replication to build status map") + + errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap), errChan) + + if len(errRecorder.Errors) > 1 { + return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) } return statusMap, masterStatusMap, nil } // WaitForRelayLogsToApply will block execution waiting for the given tablets relay logs to apply, unless the supplied // context is cancelled, or waitReplicasTimeout is exceeded. -func (wr *Wrangler) WaitForRelayLogsToApply(ctx context.Context, tabletInfo *topo.TabletInfo, waitReplicasTimeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer cancel() - for { - status, err := wr.tmc.ReplicationStatus(ctx, tabletInfo.Tablet) - if err != nil { - if err == mysql.ErrNotReplica { - // This tablet thinks it's not a replica, so we have nothing to wait on as far as applying - // relay logs. - return nil - } - return err - } - replicationStatus := mysql.ProtoToReplicationStatus(status) - if !replicationStatus.RelayLogPosition.IsZero() && replicationStatus.Position.AtLeast(replicationStatus.RelayLogPosition) { - return nil - } - if replicationStatus.RelayLogPosition.IsZero() && replicationStatus.FilePosition.AtLeast(replicationStatus.FileRelayLogPosition) { - return nil - } - - time.Sleep(2 * time.Second) +func (wr *Wrangler) WaitForRelayLogsToApply(ctx context.Context, tabletInfo *topo.TabletInfo, status *replicationdatapb.StopReplicationStatus) error { + var err error + if status.After.RelayLogPosition != "" { + err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.RelayLogPosition) + } else { + err = wr.tmc.WaitForPosition(ctx, tabletInfo.Tablet, status.After.FileRelayLogPosition) } + return err } // TabletExternallyReparented changes the type of new master for this shard to MASTER diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index fd01253c8cd..c9fb63cc750 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -77,6 +77,7 @@ func TestEmergencyReparentShard(t *testing.T) { newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ GTIDSet: newMasterRelayLogPos, } + newMaster.FakeMysqlDaemon.WaitMasterPosition = newMaster.FakeMysqlDaemon.CurrentMasterFilePosition newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", "CREATE DATABASE IF NOT EXISTS _vt", From 88b0788360aa66e93214a3b02615a31a34cfbabf Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Wed, 22 Jul 2020 18:41:22 -0700 Subject: [PATCH 06/26] Added ignore_unreachable_replicas flag along with a test to check that it is working as intended. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 134 +++++++++++++++++- go/vt/topo/topoproto/tablet.go | 13 ++ go/vt/vtctl/reparent.go | 6 +- go/vt/wrangler/reparent.go | 22 +-- .../testlib/emergency_reparent_shard_test.go | 3 +- 5 files changed, 160 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index f895e048c96..709740bb316 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -25,14 +25,13 @@ import ( "testing" "time" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/log" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -121,7 +120,6 @@ func TestReparentDownMaster(t *testing.T) { "-new_master", tablet62044.Alias, "-wait_replicas_timeout", "30s") require.NoError(t, err) - require.Nil(t, err) // Check that old master tablet is left around for human intervention. err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") @@ -270,6 +268,132 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { killTablets(t) } +func TestReparentUnreachableReplicas(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + // Create Database + err := tablet.VttabletProcess.CreateDB(keyspaceName) + require.Nil(t, err) + + // Reset status, don't wait for the tablet status. We will check it later + tablet.VttabletProcess.ServingStatus = "" + // Init Tablet + err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // Start the tablet + err = tablet.VttabletProcess.Setup() + require.Nil(t, err) + } + + for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { + err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) + require.Nil(t, err) + } + + // Init Shard Master + err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", + "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) + require.Nil(t, err) + + validateTopology(t, true) + + // create Tables + runSQL(ctx, t, sqlSchema, tablet62344) + + // insert data into the old master, check the connected replica work + insertSQL1 := fmt.Sprintf(insertSQL, 2, 2) + runSQL(ctx, t, insertSQL1, tablet62344) + err = checkInsertedValues(ctx, t, tablet62044, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet41983, 2) + require.Nil(t, err) + err = checkInsertedValues(ctx, t, tablet31981, 2) + require.Nil(t, err) + + // Make the current master agent and database unavailable. + err = tablet62344.VttabletProcess.TearDown() + require.Nil(t, err) + err = tablet62344.MysqlctlProcess.Stop() + require.Nil(t, err) + + // Take down a replica - this should cause the emergency reparent to fail. + err = tablet41983.VttabletProcess.TearDown() + require.Nil(t, err) + err = tablet41983.MysqlctlProcess.Stop() + require.Nil(t, err) + + // We expect this one to fail because we have an unreachable replica + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-wait_replicas_timeout", "30s") + require.NotNil(t, err) + + // Now let's run it again, but set the command to ignore the unreachable replica. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "EmergencyReparentShard", + "-keyspace_shard", keyspaceShard, + "-ignore_unreachable_replicas", tablet41983.Alias, + "-wait_replicas_timeout", "30s") + require.Nil(t, err) + + // We'll bring back the replica we took down. + tablet41983.MysqlctlProcess.InitMysql = false + err = tablet41983.MysqlctlProcess.Start() + require.Nil(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet41983, tablet41983.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // Check that old master tablet is left around for human intervention. + err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + require.Error(t, err) + + // Now we'll manually remove the old master, simulating a human cleaning up a dead master. + err = clusterInstance.VtctlclientProcess.ExecuteCommand( + "DeleteTablet", + "-allow_master", + tablet62344.Alias) + require.Nil(t, err) + + // Now validate topo is correct. + validateTopology(t, false) + + var newMasterTablet *cluster.Vttablet + for _, tablet := range []*cluster.Vttablet{tablet62044, tablet41983, tablet31981} { + if isHealthyMasterTablet(t, tablet) { + newMasterTablet = tablet + break + } + } + require.NotNil(t, newMasterTablet) + + // Check new master has latest transaction. + err = checkInsertedValues(ctx, t, newMasterTablet, 2) + require.Nil(t, err) + + // bring back the old master as a replica, check that it catches up + tablet62344.MysqlctlProcess.InitMysql = false + err = tablet62344.MysqlctlProcess.Start() + require.Nil(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.Nil(t, err) + + // As there is already a master the new replica will come directly in SERVING state + tablet62344.VttabletProcess.ServingStatus = "SERVING" + // Start the tablet + err = tablet62344.VttabletProcess.Setup() + require.Nil(t, err) + + err = checkInsertedValues(ctx, t, tablet62344, 2) + require.Nil(t, err) + + // Kill tablets + killTablets(t) +} + func TestReparentCrossCell(t *testing.T) { defer cluster.PanicHandler(t) @@ -940,7 +1064,7 @@ func checkMasterTablet(t *testing.T, tablet *cluster.Vttablet) { assert.Equal(t, topodatapb.TabletType_MASTER, tabletType) } -// isHealthyMasterTablet will return if tablet is master AND healthy +// isHealthyMasterTablet will return if tablet is master AND healthy. func isHealthyMasterTablet(t *testing.T, tablet *cluster.Vttablet) bool { result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", tablet.Alias) require.Nil(t, err) diff --git a/go/vt/topo/topoproto/tablet.go b/go/vt/topo/topoproto/tablet.go index 7c5dd30a437..bca66befc3d 100644 --- a/go/vt/topo/topoproto/tablet.go +++ b/go/vt/topo/topoproto/tablet.go @@ -26,6 +26,8 @@ import ( "strings" "github.com/golang/protobuf/proto" + "k8s.io/apimachinery/pkg/util/sets" + "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/vterrors" @@ -89,6 +91,17 @@ func ParseTabletAlias(aliasStr string) (*topodatapb.TabletAlias, error) { }, nil } +// ParseTabletSet returns a set of tablets based on a provided comma separated list of tablets. +func ParseTabletSet(tabletListStr string) sets.String { + set := sets.NewString() + if tabletListStr == "" { + return set + } + list := strings.Split(tabletListStr, ",") + set.Insert(list...) + return set +} + // ParseUID parses just the uid (a number) func ParseUID(value string) (uint32, error) { uid, err := strconv.ParseUint(value, 10, 32) diff --git a/go/vt/vtctl/reparent.go b/go/vt/vtctl/reparent.go index c1db88b3710..708c707df6e 100644 --- a/go/vt/vtctl/reparent.go +++ b/go/vt/vtctl/reparent.go @@ -49,7 +49,7 @@ func init() { addCommand("Shards", command{ "EmergencyReparentShard", commandEmergencyReparentShard, - "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=]", + "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=] [-ignore_unreachable_tablets=]", "Reparents the shard to the new master. Assumes the old master is dead and not responsding."}) addCommand("Shards", command{ "TabletExternallyReparented", @@ -167,6 +167,7 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s } keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented") newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master") + ignoreUnreachableReplicasList := subFlags.String("ignore_unreachable_replicas", "", "list of unreachable replica tablet aliases to ignore during emergency reparent") if err := subFlags.Parse(args); err != nil { return err } @@ -192,7 +193,8 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s return err } } - return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout) + unreachableReplicas := topoproto.ParseTabletSet(*ignoreUnreachableReplicasList) + return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout, unreachableReplicas) } func commandTabletExternallyReparented(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 749c14f1381..3712cc495b1 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -845,7 +845,7 @@ func (wr *Wrangler) chooseNewMaster( // EmergencyReparentShard will make the provided tablet the master for // the shard, when the old master is completely unreachable. -func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) (err error) { +func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) (err error) { // lock the shard actionMsg := "EmergencyReparentShard" if masterElectTabletAlias != nil { @@ -861,7 +861,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard ev := &events.Reparent{} // do the work - err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout) + err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, unreachableReplicas) if err != nil { event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) } else { @@ -870,7 +870,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard return err } -func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) error { +func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) error { shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { return err @@ -883,12 +883,12 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return err } - statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout) + statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, unreachableReplicas) if err != nil { return err } - if len(statusMap)+len(masterStatusMap) < len(tabletMap)-1 { + if len(statusMap)+len(masterStatusMap) < len(tabletMap)-unreachableReplicas.Len()-1 { return fmt.Errorf("didn't get enough tablet responses to find a candidate garaunteed to have newest transaction") } @@ -1027,14 +1027,14 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } waitOnTablets := func() *concurrency.AllErrorRecorder { - return waitOnNMinusOneTablets(replCancel, len(tabletMap), errChan) + return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len(), errChan) } for alias, tabletInfo := range tabletMap { if alias == newMasterTabletAliasStr { wgMaster.Add(1) go handleMaster(alias, tabletInfo) - } else { + } else if !unreachableReplicas.Has(alias) { go handleReplicas(alias, tabletInfo) } } @@ -1198,7 +1198,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio return validCandidates, nil } -func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { +func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { // Stop replication on all replicas, get their current // replication position event.DispatchUpdate(ev, "stop replication on all replicas") @@ -1243,10 +1243,12 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e } for alias, tabletInfo := range tabletMap { - go fillStatus(alias, tabletInfo) + if !unreachableReplicas.Has(alias) { + go fillStatus(alias, tabletInfo) + } } - errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap), errChan) + errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-unreachableReplicas.Len(), errChan) if len(errRecorder.Errors) > 1 { return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index c9fb63cc750..ff5176292df 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" @@ -265,7 +266,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { defer moreAdvancedReplica.StopActionLoop(t) // run EmergencyReparentShard - err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second) + err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second, sets.NewString()) assert.Error(t, err) assert.Contains(t, err.Error(), "master elect is either not fully caught up, or") // check what was run From 0b4f29c0457ba49306923ccc87cb6e2e8550e0c4 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Wed, 22 Jul 2020 19:24:28 -0700 Subject: [PATCH 07/26] Silly commit because CICD seems to still have an old version. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 709740bb316..7bc2380d68e 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -293,14 +293,14 @@ func TestReparentUnreachableReplicas(t *testing.T) { require.Nil(t, err) } - // Init Shard Master + // Init Shard Master. err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) require.Nil(t, err) validateTopology(t, true) - // create Tables + // Create Tables. runSQL(ctx, t, sqlSchema, tablet62344) // insert data into the old master, check the connected replica work From 5696a12a2dae5a8ba544668ebc18dcd901fbf9e9 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Mon, 27 Jul 2020 16:19:58 -0700 Subject: [PATCH 08/26] Switch error nil checks to NoError checks per review suggestion. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 43 +++++++++++++--------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 7bc2380d68e..ab50c7b9c68 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -78,6 +78,10 @@ func TestReparentDownMaster(t *testing.T) { // Reset status, don't wait for the tablet status. We will check it later tablet.VttabletProcess.ServingStatus = "" + // Init Tablet + err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) + require.NoError(t, err) + // Start the tablet err = tablet.VttabletProcess.Setup() require.NoError(t, err) @@ -109,6 +113,7 @@ func TestReparentDownMaster(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand( "-action_timeout", "1s", "PlannedReparentShard", + "-wait_replicas_timeout", "5s", "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias) require.Error(t, err) @@ -130,7 +135,7 @@ func TestReparentDownMaster(t *testing.T) { "DeleteTablet", "-allow_master", tablet62344.Alias) - require.Nil(t, err) + require.NoError(t, err) // Now validate topo is correct. validateTopology(t, false) @@ -149,6 +154,8 @@ func TestReparentDownMaster(t *testing.T) { tablet62344.MysqlctlProcess.InitMysql = false err = tablet62344.MysqlctlProcess.Start() require.NoError(t, err) + err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) + require.NoError(t, err) // As there is already a master the new replica will come directly in SERVING state tablet62344.VttabletProcess.ServingStatus = "SERVING" @@ -170,28 +177,28 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { // Create Database err := tablet.VttabletProcess.CreateDB(keyspaceName) - require.Nil(t, err) + require.NoError(t, err) // Reset status, don't wait for the tablet status. We will check it later tablet.VttabletProcess.ServingStatus = "" // Init Tablet err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) - require.Nil(t, err) + require.NoError(t, err) // Start the tablet err = tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } for _, tablet := range []cluster.Vttablet{*tablet62344, *tablet62044, *tablet41983, *tablet31981} { err := tablet.VttabletProcess.WaitForTabletTypes([]string{"SERVING", "NOT_SERVING"}) - require.Nil(t, err) + require.NoError(t, err) } // Init Shard Master err := clusterInstance.VtctlclientProcess.ExecuteCommand("InitShardMaster", "-force", fmt.Sprintf("%s/%s", keyspaceName, shardName), tablet62344.Alias) - require.Nil(t, err) + require.NoError(t, err) validateTopology(t, true) @@ -202,24 +209,24 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { insertSQL1 := fmt.Sprintf(insertSQL, 2, 2) runSQL(ctx, t, insertSQL1, tablet62344) err = checkInsertedValues(ctx, t, tablet62044, 2) - require.Nil(t, err) + require.NoError(t, err) err = checkInsertedValues(ctx, t, tablet41983, 2) - require.Nil(t, err) + require.NoError(t, err) err = checkInsertedValues(ctx, t, tablet31981, 2) - require.Nil(t, err) + require.NoError(t, err) // Make the current master agent and database unavailable. err = tablet62344.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) err = tablet62344.MysqlctlProcess.Stop() - require.Nil(t, err) + require.NoError(t, err) // Run forced reparent operation, this should now proceed unimpeded. err = clusterInstance.VtctlclientProcess.ExecuteCommand( "EmergencyReparentShard", "-keyspace_shard", keyspaceShard, "-wait_replicas_timeout", "30s") - require.Nil(t, err) + require.NoError(t, err) // Check that old master tablet is left around for human intervention. err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") @@ -230,7 +237,7 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { "DeleteTablet", "-allow_master", tablet62344.Alias) - require.Nil(t, err) + require.NoError(t, err) // Now validate topo is correct. validateTopology(t, false) @@ -246,23 +253,23 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { // Check new master has latest transaction. err = checkInsertedValues(ctx, t, newMasterTablet, 2) - require.Nil(t, err) + require.NoError(t, err) // bring back the old master as a replica, check that it catches up tablet62344.MysqlctlProcess.InitMysql = false err = tablet62344.MysqlctlProcess.Start() - require.Nil(t, err) + require.NoError(t, err) err = clusterInstance.VtctlclientProcess.InitTablet(tablet62344, tablet62344.Cell, keyspaceName, hostname, shardName) - require.Nil(t, err) + require.NoError(t, err) // As there is already a master the new replica will come directly in SERVING state tablet62344.VttabletProcess.ServingStatus = "SERVING" // Start the tablet err = tablet62344.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) err = checkInsertedValues(ctx, t, tablet62344, 2) - require.Nil(t, err) + require.NoError(t, err) // Kill tablets killTablets(t) From 08f018c65c6499e200b16c4786c05158efb9e216 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Mon, 27 Jul 2020 16:46:17 -0700 Subject: [PATCH 09/26] Added explicit checking of error string. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index ab50c7b9c68..070f0e461f8 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -127,8 +127,9 @@ func TestReparentDownMaster(t *testing.T) { require.NoError(t, err) // Check that old master tablet is left around for human intervention. - err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") require.Error(t, err) + require.Contains(t, out, "already has master") // Now we'll manually remove it, simulating a human cleaning up a dead master. err = clusterInstance.VtctlclientProcess.ExecuteCommand( @@ -229,8 +230,9 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { require.NoError(t, err) // Check that old master tablet is left around for human intervention. - err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate") + out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") require.Error(t, err) + require.Contains(t, out, "already has master") // Now we'll manually remove the old master, simulating a human cleaning up a dead master. err = clusterInstance.VtctlclientProcess.ExecuteCommand( @@ -250,6 +252,8 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { } } require.NotNil(t, newMasterTablet) + // Validate new master is not old master. + require.NotEqual(t, newMasterTablet.Alias, tablet62344.Alias) // Check new master has latest transaction. err = checkInsertedValues(ctx, t, newMasterTablet, 2) From a69e6a09e5c3cb6591ffda42c602f5ab82ff8761 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 30 Jul 2020 19:18:03 -0700 Subject: [PATCH 10/26] Trivial fixes per review suggestions. Signed-off-by: Peter Farr --- go/vt/vtctl/reparent.go | 10 +++--- go/vt/wrangler/reparent.go | 68 +++++++++++++++++--------------------- 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/go/vt/vtctl/reparent.go b/go/vt/vtctl/reparent.go index 708c707df6e..b4cca092825 100644 --- a/go/vt/vtctl/reparent.go +++ b/go/vt/vtctl/reparent.go @@ -49,8 +49,8 @@ func init() { addCommand("Shards", command{ "EmergencyReparentShard", commandEmergencyReparentShard, - "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=] [-ignore_unreachable_tablets=]", - "Reparents the shard to the new master. Assumes the old master is dead and not responsding."}) + "-keyspace_shard= [-new_master=] [-wait_replicas_timeout=] [-ignore_replicas=]", + "Reparents the shard to the new master. Assumes the old master is dead and not responding."}) addCommand("Shards", command{ "TabletExternallyReparented", commandTabletExternallyReparented, @@ -166,8 +166,8 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s *waitReplicasTimeout = *deprecatedTimeout } keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented") - newMaster := subFlags.String("new_master", "", "alias of a tablet that should be the new master") - ignoreUnreachableReplicasList := subFlags.String("ignore_unreachable_replicas", "", "list of unreachable replica tablet aliases to ignore during emergency reparent") + newMaster := subFlags.String("new_master", "", "optional alias of a tablet that should be the new master. If not specified, Vitess will select the best candidate") + ignoreReplicasList := subFlags.String("ignore_replicas", "", "comma-separated list of replica tablet aliases to ignore during emergency reparent") if err := subFlags.Parse(args); err != nil { return err } @@ -193,7 +193,7 @@ func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, s return err } } - unreachableReplicas := topoproto.ParseTabletSet(*ignoreUnreachableReplicasList) + unreachableReplicas := topoproto.ParseTabletSet(*ignoreReplicasList) return wr.EmergencyReparentShard(ctx, keyspace, shard, tabletAlias, *waitReplicasTimeout, unreachableReplicas) } diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 3712cc495b1..85e3097555a 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -849,7 +849,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard // lock the shard actionMsg := "EmergencyReparentShard" if masterElectTabletAlias != nil { - actionMsg = fmt.Sprintf("%v(%v)", actionMsg, topoproto.TabletAliasString(masterElectTabletAlias)) + actionMsg += fmt.Sprintf("(%v)", topoproto.TabletAliasString(masterElectTabletAlias)) } ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, actionMsg) if lockErr != nil { @@ -883,15 +883,20 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return err } + var newMasterTabletAliasStr string + if masterElectTabletAlias != nil { + newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) + _, ok := tabletMap[newMasterTabletAliasStr] + if !ok { + return fmt.Errorf("master-elect tablet %v is not in the shard", newMasterTabletAliasStr) + } + } + statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, unreachableReplicas) if err != nil { return err } - if len(statusMap)+len(masterStatusMap) < len(tabletMap)-unreachableReplicas.Len()-1 { - return fmt.Errorf("didn't get enough tablet responses to find a candidate garaunteed to have newest transaction") - } - // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { return fmt.Errorf("lost topology lock, aborting: %v", err) @@ -905,42 +910,31 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return fmt.Errorf("no valid candidates for emergency reparent") } - var newMasterTabletAliasStr string if masterElectTabletAlias != nil { - masterElectTabletAliasStr := topoproto.TabletAliasString(masterElectTabletAlias) - masterElectTabletInfo, ok := tabletMap[masterElectTabletAliasStr] - if !ok { - return fmt.Errorf("master-elect tablet %v is not in the shard", masterElectTabletAliasStr) - } + masterElectTabletInfo := tabletMap[newMasterTabletAliasStr] ev.NewMaster = *masterElectTabletInfo.Tablet - if topoproto.TabletAliasEqual(shardInfo.MasterAlias, masterElectTabletAlias) { - return fmt.Errorf("master-elect tablet %v is already the master", topoproto.TabletAliasString(masterElectTabletAlias)) - } validCandidatesSet := sets.NewString(validCandidates...) - if !validCandidatesSet.Has(masterElectTabletAliasStr) { + if !validCandidatesSet.Has(newMasterTabletAliasStr) { return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") } // Wait for masterElect to catch up. - err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, statusMap[masterElectTabletAliasStr]) + err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, statusMap[newMasterTabletAliasStr]) if err != nil { return err } - newMasterTabletAliasStr = masterElectTabletAliasStr } else { - // race. + // User has not provided a master elect, so we run a race among our valid candidates. + // The first candidate (tablet) to succeed at applying its relay logs is the winner. aliasChan := make(chan string) - groupCtx, groupCancel := context.WithCancel(ctx) + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer groupCancel() for _, candidate := range validCandidates { go func(alias string) { var resultAlias string - done := func() { aliasChan <- resultAlias } - defer done() - ctx, cancel := context.WithCancel(groupCtx) - defer cancel() + defer func() { aliasChan <- resultAlias }() - err = wr.WaitForRelayLogsToApply(ctx, tabletMap[alias], statusMap[alias]) + err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) if err == nil { resultAlias = alias } @@ -951,8 +945,10 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events var winningTabletAlias string for alias := range aliasChan { resultCounter++ - if alias != "" { + if alias != "" && winningTabletAlias == "" { winningTabletAlias = alias + // We only need the first tablet that succeeds, so we can cancel the other tablets + // and move along. groupCancel() } if resultCounter == len(validCandidates) { @@ -985,7 +981,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // Create a cancelable context for the following RPCs. // If error conditions happen, we can cancel all outgoing RPCs. - replCtx, replCancel := context.WithCancel(ctx) + replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer replCancel() // Reset replication on all replicas to point to the new master, and @@ -1001,8 +997,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events handleMaster := func(alias string, tabletInfo *topo.TabletInfo) { err := fmt.Errorf("handleMaster never finished execution for alias: %v", alias) - sendErr := func() { errChan <- err } - defer sendErr() + defer func() { errChan <- err }() defer wgMaster.Done() wr.logger.Infof("populating reparent journal on new master %v", alias) @@ -1011,10 +1006,9 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events masterErr = err } } - handleReplicas := func(alias string, tabletInfo *topo.TabletInfo) { + handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { var err error - sendErr := func() { errChan <- err } - defer sendErr() + defer func() { errChan <- err }() wr.logger.Infof("setting new master on replica %v", alias) forceStart := false @@ -1035,7 +1029,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events wgMaster.Add(1) go handleMaster(alias, tabletInfo) } else if !unreachableReplicas.Has(alias) { - go handleReplicas(alias, tabletInfo) + go handleReplica(alias, tabletInfo) } } @@ -1067,11 +1061,11 @@ func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, error for err := range errorChannel { responseCounter++ - successCounter++ if err != nil { errCounter++ - successCounter-- rec.RecordError(err) + } else { + successCounter++ } if responseCounter == tabletCount { // We must wait for any cancelled goroutines to return their error. @@ -1175,6 +1169,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio for _, position := range positionMap { if winningPosition.IsZero() { winningPosition = position + continue } if position.AtLeast(winningPosition) { winningPosition = position @@ -1211,8 +1206,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e defer groupCancel() fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { err := fmt.Errorf("fillStatus did not successfully complete") - returnErr := func() { errChan <- err } - defer returnErr() + defer func() { errChan <- err }() wr.logger.Infof("getting replication position from %v", alias) ctx, cancel := context.WithCancel(groupCtx) @@ -1237,7 +1231,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e mu.Unlock() default: - wr.logger.Warningf("failed to get replication status from %v, ignoring tablet: %v", alias, err) + wr.logger.Warningf("failed to get replication status from %v: %v", alias, err) err = fmt.Errorf("error when getting replication status for alias %v: %v", alias, err) } } From 6a7d62a775c8757d2c02081a85b1c4a1535cadea Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Fri, 31 Jul 2020 01:06:47 -0700 Subject: [PATCH 11/26] Refactor per review suggestion to reduce number of code paths. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 3 +- go/vt/wrangler/reparent.go | 75 ++++++++++------------ 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 070f0e461f8..5bd0713fa2b 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -124,6 +124,7 @@ func TestReparentDownMaster(t *testing.T) { "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias, "-wait_replicas_timeout", "30s") + require.Nil(t, err) require.NoError(t, err) // Check that old master tablet is left around for human intervention. @@ -347,7 +348,7 @@ func TestReparentUnreachableReplicas(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand( "EmergencyReparentShard", "-keyspace_shard", keyspaceShard, - "-ignore_unreachable_replicas", tablet41983.Alias, + "-ignore_replicas", tablet41983.Alias, "-wait_replicas_timeout", "30s") require.Nil(t, err) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 85e3097555a..90969d45746 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -910,56 +910,49 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return fmt.Errorf("no valid candidates for emergency reparent") } + validCandidatesSet := sets.NewString(validCandidates...) if masterElectTabletAlias != nil { - masterElectTabletInfo := tabletMap[newMasterTabletAliasStr] - ev.NewMaster = *masterElectTabletInfo.Tablet - validCandidatesSet := sets.NewString(validCandidates...) if !validCandidatesSet.Has(newMasterTabletAliasStr) { return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") } + validCandidatesSet = sets.NewString(newMasterTabletAliasStr) + } - // Wait for masterElect to catch up. - err = wr.WaitForRelayLogsToApply(ctx, masterElectTabletInfo, statusMap[newMasterTabletAliasStr]) - if err != nil { - return err - } - } else { - // User has not provided a master elect, so we run a race among our valid candidates. - // The first candidate (tablet) to succeed at applying its relay logs is the winner. - aliasChan := make(chan string) - groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer groupCancel() - for _, candidate := range validCandidates { - go func(alias string) { - var resultAlias string - defer func() { aliasChan <- resultAlias }() - - err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) - if err == nil { - resultAlias = alias - } - }(candidate) - } - - resultCounter := 0 - var winningTabletAlias string - for alias := range aliasChan { - resultCounter++ - if alias != "" && winningTabletAlias == "" { - winningTabletAlias = alias - // We only need the first tablet that succeeds, so we can cancel the other tablets - // and move along. - groupCancel() - } - if resultCounter == len(validCandidates) { - break + // We run a race among our valid candidates. + // The first candidate (tablet) to succeed at applying its relay logs is the winner. + aliasChan := make(chan string) + groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) + defer groupCancel() + for candidate := range validCandidatesSet { + go func(alias string) { + var resultAlias string + defer func() { aliasChan <- resultAlias }() + + err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) + if err == nil { + resultAlias = alias } + }(candidate) + } + + resultCounter := 0 + var winningTabletAlias string + for alias := range aliasChan { + resultCounter++ + if alias != "" && winningTabletAlias == "" { + winningTabletAlias = alias + // We only need the first tablet that succeeds, so we can cancel the other tablets + // and move along. + groupCancel() } - if winningTabletAlias == "" { - return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout") + if resultCounter == validCandidatesSet.Len() { + break } - newMasterTabletAliasStr = winningTabletAlias } + if winningTabletAlias == "" { + return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout") + } + newMasterTabletAliasStr = winningTabletAlias // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { From 80ef34412215e411756b2ea80ddb03511f765612 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Fri, 31 Jul 2020 16:37:53 -0700 Subject: [PATCH 12/26] Record all errors and surface them in the case we could not get a winning tablet in time. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 90969d45746..41a7b1f14e2 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -921,6 +921,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // We run a race among our valid candidates. // The first candidate (tablet) to succeed at applying its relay logs is the winner. aliasChan := make(chan string) + rec := &concurrency.AllErrorRecorder{} groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer groupCancel() for candidate := range validCandidatesSet { @@ -931,6 +932,8 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) if err == nil { resultAlias = alias + } else { + rec.RecordError(err) } }(candidate) } @@ -950,7 +953,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } if winningTabletAlias == "" { - return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout") + return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout: %v", rec.Error()) } newMasterTabletAliasStr = winningTabletAlias From b231de251ef816b98942762ba3d4123b2588a257 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Fri, 31 Jul 2020 16:49:11 -0700 Subject: [PATCH 13/26] Updated waitOnNMinusOneTablets helper function to take a maximum acceptable number of errors as an argument to accomodate a use case where any number of errors is acceptable. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 41a7b1f14e2..3ac3be900aa 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1017,7 +1017,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } waitOnTablets := func() *concurrency.AllErrorRecorder { - return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len(), errChan) + return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len(), errChan, len(tabletMap)) } for alias, tabletInfo := range tabletMap { @@ -1049,7 +1049,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded, // the supplied cancel function will be called, and we will wait until N tablets return their errors, and then return an AllErrorRecorder to the caller. -func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, errorChannel chan error) *concurrency.AllErrorRecorder { +func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, errorChannel chan error, acceptableErrCnt int) *concurrency.AllErrorRecorder { errCounter := 0 successCounter := 0 responseCounter := 0 @@ -1067,7 +1067,7 @@ func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, error // We must wait for any cancelled goroutines to return their error. break } - if errCounter > 1 || successCounter == tabletCount-1 { + if errCounter > acceptableErrCnt || successCounter == tabletCount-1 { ctxCancel() } } @@ -1238,7 +1238,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e } } - errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-unreachableReplicas.Len(), errChan) + errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-unreachableReplicas.Len(), errChan, 1) if len(errRecorder.Errors) > 1 { return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) From 7e71b5953e209b10c7a8aed1cf4856d1d9d6f7dc Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Fri, 31 Jul 2020 17:08:55 -0700 Subject: [PATCH 14/26] Simplify design drastically per review suggestion to synchronously wait on handleMaster. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 3ac3be900aa..6ccb386b369 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -988,19 +988,10 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events event.DispatchUpdate(ev, "reparenting all tablets") now := time.Now().UnixNano() errChan := make(chan error) - wgMaster := sync.WaitGroup{} - var masterErr error - - handleMaster := func(alias string, tabletInfo *topo.TabletInfo) { - err := fmt.Errorf("handleMaster never finished execution for alias: %v", alias) - defer func() { errChan <- err }() - defer wgMaster.Done() + handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { wr.logger.Infof("populating reparent journal on new master %v", alias) - err = wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) - if err != nil { - masterErr = err - } + return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) } handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { var err error @@ -1017,33 +1008,25 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } waitOnTablets := func() *concurrency.AllErrorRecorder { - return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len(), errChan, len(tabletMap)) + return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len()-1, errChan, len(tabletMap)) } for alias, tabletInfo := range tabletMap { if alias == newMasterTabletAliasStr { - wgMaster.Add(1) - go handleMaster(alias, tabletInfo) + continue } else if !unreachableReplicas.Has(alias) { go handleReplica(alias, tabletInfo) } } + defer waitOnTablets() - wgMaster.Wait() + masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) if masterErr != nil { wr.logger.Warningf("master failed to PopulateReparentJournal") replCancel() - waitOnTablets() return fmt.Errorf("failed to PopulateReparentJournal on master: %v", masterErr) } - errorRecorder := waitOnTablets() - if len(errorRecorder.Errors) > 1 { - // We allow one error, for the presumed non-contactable dead master tablet. - wr.Logger().Errorf2(errorRecorder.Error(), "some replicas failed to reparent") - return err - } - return nil } From 5f7d2ccad8e3a968ef60f0b9fa2917e0271c8933 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 4 Aug 2020 19:43:40 -0700 Subject: [PATCH 15/26] Fix test issue now that SetMaster calls are just best effort and done on a defer. Signed-off-by: Peter Farr --- go/vt/wrangler/testlib/emergency_reparent_shard_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index ff5176292df..ee061108757 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -103,8 +103,6 @@ func TestEmergencyReparentShard(t *testing.T) { oldMaster.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) oldMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE", - "FAKE SET MASTER", - "START SLAVE", } oldMaster.StartActionLoop(t, wr) defer oldMaster.StopActionLoop(t) From 3af3cf1a82dccd0b7290b2146df944994c4e44bf Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Mon, 17 Aug 2020 18:07:36 -0700 Subject: [PATCH 16/26] Small fixes per review suggestions. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 6ccb386b369..c1961143640 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -847,7 +847,7 @@ func (wr *Wrangler) chooseNewMaster( // the shard, when the old master is completely unreachable. func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) (err error) { // lock the shard - actionMsg := "EmergencyReparentShard" + actionMsg := emergencyReparentShardOperation if masterElectTabletAlias != nil { actionMsg += fmt.Sprintf("(%v)", topoproto.TabletAliasString(masterElectTabletAlias)) } @@ -953,7 +953,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } if winningTabletAlias == "" { - return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs under the provided wait_replicas_timeout: %v", rec.Error()) + return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs within the provided wait_replicas_timeout: %v", rec.Error()) } newMasterTabletAliasStr = winningTabletAlias From 1ccfab439e39f1c58891a3637beb6465aefeb0ae Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 18 Aug 2020 18:45:08 -0700 Subject: [PATCH 17/26] Switched all fmt.Errorf to vterrors.Errorf or Wrapf. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index c1961143640..5540254e952 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -888,7 +888,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) _, ok := tabletMap[newMasterTabletAliasStr] if !ok { - return fmt.Errorf("master-elect tablet %v is not in the shard", newMasterTabletAliasStr) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master-elect tablet %v is not in the shard", newMasterTabletAliasStr) } } @@ -899,7 +899,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return fmt.Errorf("lost topology lock, aborting: %v", err) + return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "lost topology lock, aborting: %v", err) } validCandidates, err := wr.findValidReparentCandidates(statusMap, masterStatusMap) @@ -907,13 +907,13 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return err } if len(validCandidates) == 0 { - return fmt.Errorf("no valid candidates for emergency reparent") + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") } validCandidatesSet := sets.NewString(validCandidates...) if masterElectTabletAlias != nil { if !validCandidatesSet.Has(newMasterTabletAliasStr) { - return fmt.Errorf("master elect is either not fully caught up, or has errant GTIDs") + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect is either not fully caught up, or has errant GTIDs") } validCandidatesSet = sets.NewString(newMasterTabletAliasStr) } @@ -953,13 +953,13 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } if winningTabletAlias == "" { - return fmt.Errorf("could not find a valid candidate for new master that applied its relay logs within the provided wait_replicas_timeout: %v", rec.Error()) + return vterrors.Wrapf(rec.Error(), "could not find a valid candidate for new master that applied its relay logs within the provided wait_replicas_timeout: %v", rec.Error()) } newMasterTabletAliasStr = winningTabletAlias // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return fmt.Errorf("lost topology lock, aborting: %v", err) + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } // Promote the masterElect @@ -967,12 +967,12 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events event.DispatchUpdate(ev, "promoting replica") rp, err := wr.tmc.PromoteReplica(ctx, tabletMap[newMasterTabletAliasStr].Tablet) if err != nil { - return fmt.Errorf("master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) + return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) } // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return fmt.Errorf("lost topology lock, aborting: %v", err) + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } // Create a cancelable context for the following RPCs. @@ -1004,7 +1004,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) if err != nil { - err = fmt.Errorf("tablet %v SetMaster failed: %v", alias, err) + err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) } } waitOnTablets := func() *concurrency.AllErrorRecorder { @@ -1024,7 +1024,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events if masterErr != nil { wr.logger.Warningf("master failed to PopulateReparentJournal") replCancel() - return fmt.Errorf("failed to PopulateReparentJournal on master: %v", masterErr) + return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr) } return nil @@ -1076,7 +1076,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio break } else if status.RelayLogPosition.IsZero() { // Bail. We have an odd one in the bunch. - return nil, fmt.Errorf("encountered tablet %v with no relay log position, when at least one other tablet in the status map has a relay log positions", alias) + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has a relay log positions", alias) } } @@ -1106,7 +1106,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) if !ok { - return nil, fmt.Errorf("we got a filled in relay log position for a tablet, even though the GTIDSet is not of type Mysql56GTIDSet. We don't know how this could happen") + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "we got a filled in relay log position for a tablet, even though the GTIDSet is not of type Mysql56GTIDSet. We don't know how this could happen") } cleanedGTIDSet := relayLogGTIDSet.Difference(errantGTIDs) cleanedRelayLogPosition := mysql.Position{GTIDSet: cleanedGTIDSet} @@ -1195,7 +1195,8 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e switch err { case mysql.ErrNotReplica: fmt.Printf("Found ErrNotReplica for alias: %v", alias) - masterStatus, err := wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) + var masterStatus *replicationdatapb.MasterStatus + masterStatus, err = wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) if err != nil { wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) return @@ -1211,7 +1212,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e default: wr.logger.Warningf("failed to get replication status from %v: %v", alias, err) - err = fmt.Errorf("error when getting replication status for alias %v: %v", alias, err) + err = vterrors.Wrapf(err, "error when getting replication status for alias %v: %v", alias, err) } } @@ -1224,7 +1225,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-unreachableReplicas.Len(), errChan, 1) if len(errRecorder.Errors) > 1 { - return nil, nil, fmt.Errorf("encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) + return nil, nil, vterrors.Wrapf(errRecorder.Error(), "encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) } return statusMap, masterStatusMap, nil } From e166f948d2da0f7b078db8ae3561dcb872a505a9 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 18 Aug 2020 18:51:54 -0700 Subject: [PATCH 18/26] Rename to ignoredTablets to match flag. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 5540254e952..147128c990a 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -845,7 +845,7 @@ func (wr *Wrangler) chooseNewMaster( // EmergencyReparentShard will make the provided tablet the master for // the shard, when the old master is completely unreachable. -func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) (err error) { +func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (err error) { // lock the shard actionMsg := emergencyReparentShardOperation if masterElectTabletAlias != nil { @@ -861,7 +861,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard ev := &events.Reparent{} // do the work - err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, unreachableReplicas) + err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, ignoredTablets) if err != nil { event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) } else { @@ -870,7 +870,7 @@ func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard return err } -func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) error { +func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) error { shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { return err @@ -892,7 +892,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } - statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, unreachableReplicas) + statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, ignoredTablets) if err != nil { return err } @@ -1008,13 +1008,13 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } waitOnTablets := func() *concurrency.AllErrorRecorder { - return waitOnNMinusOneTablets(replCancel, len(tabletMap)-unreachableReplicas.Len()-1, errChan, len(tabletMap)) + return waitOnNMinusOneTablets(replCancel, len(tabletMap)-ignoredTablets.Len()-1, errChan, len(tabletMap)) } for alias, tabletInfo := range tabletMap { if alias == newMasterTabletAliasStr { continue - } else if !unreachableReplicas.Has(alias) { + } else if !ignoredTablets.Has(alias) { go handleReplica(alias, tabletInfo) } } @@ -1172,7 +1172,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio return validCandidates, nil } -func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, unreachableReplicas sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { +func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { // Stop replication on all replicas, get their current // replication position event.DispatchUpdate(ev, "stop replication on all replicas") @@ -1217,12 +1217,12 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e } for alias, tabletInfo := range tabletMap { - if !unreachableReplicas.Has(alias) { + if !ignoredTablets.Has(alias) { go fillStatus(alias, tabletInfo) } } - errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-unreachableReplicas.Len(), errChan, 1) + errRecorder := waitOnNMinusOneTablets(groupCancel, len(tabletMap)-ignoredTablets.Len(), errChan, 1) if len(errRecorder.Errors) > 1 { return nil, nil, vterrors.Wrapf(errRecorder.Error(), "encountered more than one error when trying to stop replication and get positions: %v", errRecorder.Error()) From 3a6fc8997439707d8c3c751db73158a253a78198 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 18 Aug 2020 22:51:52 -0700 Subject: [PATCH 19/26] Significant logic re-write to make all replicas wait for relay logs to apply before running competition. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 113 ++++++++++++++----------------------- 1 file changed, 43 insertions(+), 70 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 147128c990a..a8470b9cb77 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -910,52 +910,57 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") } - validCandidatesSet := sets.NewString(validCandidates...) if masterElectTabletAlias != nil { - if !validCandidatesSet.Has(newMasterTabletAliasStr) { + masterPos, ok := validCandidates[newMasterTabletAliasStr] + if !ok { return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect is either not fully caught up, or has errant GTIDs") } - validCandidatesSet = sets.NewString(newMasterTabletAliasStr) + validCandidates[newMasterTabletAliasStr] = masterPos } - // We run a race among our valid candidates. - // The first candidate (tablet) to succeed at applying its relay logs is the winner. - aliasChan := make(chan string) + errChan := make(chan error) rec := &concurrency.AllErrorRecorder{} groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer groupCancel() - for candidate := range validCandidatesSet { + for candidate := range validCandidates { go func(alias string) { - var resultAlias string - defer func() { aliasChan <- resultAlias }() + var err error + defer func() { errChan <- err }() err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) - if err == nil { - resultAlias = alias - } else { + if err != nil { rec.RecordError(err) } }(candidate) } resultCounter := 0 - var winningTabletAlias string - for alias := range aliasChan { + for waitErr := range errChan { resultCounter++ - if alias != "" && winningTabletAlias == "" { - winningTabletAlias = alias - // We only need the first tablet that succeeds, so we can cancel the other tablets - // and move along. + if waitErr != nil { groupCancel() } - if resultCounter == validCandidatesSet.Len() { + if resultCounter == len(validCandidates) { + groupCancel() break } } - if winningTabletAlias == "" { - return vterrors.Wrapf(rec.Error(), "could not find a valid candidate for new master that applied its relay logs within the provided wait_replicas_timeout: %v", rec.Error()) + if len(rec.Errors) != 0 { + return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided wait_replicas_timeout: %v", rec.Error()) + } + + var winningPosition mysql.Position + for alias, position := range validCandidates { + if winningPosition.IsZero() { + winningPosition = position + newMasterTabletAliasStr = alias + continue + } + if position.AtLeast(winningPosition) { + winningPosition = position + newMasterTabletAliasStr = alias + } } - newMasterTabletAliasStr = winningTabletAlias // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { @@ -987,7 +992,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // - everybody else: reparent to new master, wait for row event.DispatchUpdate(ev, "reparenting all tablets") now := time.Now().UnixNano() - errChan := make(chan error) + errChan = make(chan error) handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { wr.logger.Infof("populating reparent journal on new master %v", alias) @@ -1007,9 +1012,6 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) } } - waitOnTablets := func() *concurrency.AllErrorRecorder { - return waitOnNMinusOneTablets(replCancel, len(tabletMap)-ignoredTablets.Len()-1, errChan, len(tabletMap)) - } for alias, tabletInfo := range tabletMap { if alias == newMasterTabletAliasStr { @@ -1018,7 +1020,6 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events go handleReplica(alias, tabletInfo) } } - defer waitOnTablets() masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) if masterErr != nil { @@ -1059,7 +1060,7 @@ func waitOnNMinusOneTablets(ctxCancel context.CancelFunc, tabletCount int, error } // findValidReparentCandidates will find valid candidates for emergency reparent, and if successful, returning them as a list of tablet aliases. -func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) ([]string, error) { +func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicationdatapb.StopReplicationStatus, masterStatusMap map[string]*replicationdatapb.MasterStatus) (map[string]mysql.Position, error) { // Build out replication status list from proto types. replicationStatusMap := make(map[string]*mysql.ReplicationStatus, len(statusMap)) for alias, protoStatus := range statusMap { @@ -1071,18 +1072,18 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio var gtidBased *bool for alias, status := range replicationStatusMap { if gtidBased == nil { - gtidBased = pointer.BoolPtr(!status.RelayLogPosition.IsZero()) + _, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + gtidBased = pointer.BoolPtr(ok) } else if !*gtidBased { break } else if status.RelayLogPosition.IsZero() { // Bail. We have an odd one in the bunch. - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has a relay log positions", alias) + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "encountered tablet %v with no relay log position, when at least one other tablet in the status map has GTID based relay log positions", alias) } } - // Create relevant position list for comparison. + // Create relevant position list of errant GTID based positions for later comparison. positionMap := make(map[string]mysql.Position) - badTablets := sets.NewString() for alias, status := range replicationStatusMap { // Find errantGTIDs and clean them from status map if relevant. if *gtidBased { @@ -1093,26 +1094,24 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio statusList = append(statusList, s) } } + relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) + if !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "We got a filled in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assessment") + } errantGTIDs, err := status.FindErrantGTIDs(statusList) if err != nil { // Could not find errant GTIDs when we must. return nil, err } - // We'll keep track of bad replicas. Errant free these can still be used to determine high water mark - // but should not be considered for emergency reparent. if len(errantGTIDs) != 0 { - badTablets.Insert(alias) + // Skip inserting this tablet. It's not a valid candidate. + continue } - relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) - if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "we got a filled in relay log position for a tablet, even though the GTIDSet is not of type Mysql56GTIDSet. We don't know how this could happen") - } - cleanedGTIDSet := relayLogGTIDSet.Difference(errantGTIDs) - cleanedRelayLogPosition := mysql.Position{GTIDSet: cleanedGTIDSet} - positionMap[alias] = cleanedRelayLogPosition + pos := mysql.Position{GTIDSet: relayLogGTIDSet} + positionMap[alias] = pos } else { - positionMap[alias] = status.FileRelayLogPosition + positionMap[alias] = status.FilePosition } } @@ -1143,33 +1142,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio } } - // Find a position that is fully caught up when errant GTIDs are taken out of the equation. - var winningPosition mysql.Position - for _, position := range positionMap { - if winningPosition.IsZero() { - winningPosition = position - continue - } - if position.AtLeast(winningPosition) { - winningPosition = position - } - } - - // Now that we have a high water mark, we can get a list of all tablets that are errant GTID free - // that hit this high water mark. - var validCandidates []string - for alias, position := range positionMap { - if badTablets.Has(alias) { - // We should exclude tablets we've already determined have errant GTIDs from consideration. - continue - } - - if position.AtLeast(winningPosition) { - validCandidates = append(validCandidates, alias) - } - } - - return validCandidates, nil + return positionMap, nil } func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *events.Reparent, tabletMap map[string]*topo.TabletInfo, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (map[string]*replicationdatapb.StopReplicationStatus, map[string]*replicationdatapb.MasterStatus, error) { From 96e484385f954de28bb3cb22c547fca8b2be65ba Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 18 Aug 2020 23:18:58 -0700 Subject: [PATCH 20/26] Make sure we run competition for most caught up. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 29 +++++------------------------ 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index a8470b9cb77..46bd5f107c5 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1111,35 +1111,16 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio pos := mysql.Position{GTIDSet: relayLogGTIDSet} positionMap[alias] = pos } else { - positionMap[alias] = status.FilePosition + positionMap[alias] = status.Position } } for alias, masterStatus := range masterStatusMap { - if *gtidBased { - executedPosition, err := mysql.DecodePosition(masterStatus.Position) - if err != nil { - return nil, err - } - positionMap[alias] = executedPosition - } else { - executedFilePosition, err := mysql.DecodePosition(masterStatus.FilePosition) - if err != nil { - return nil, err - } - positionMap[alias] = executedFilePosition - } - } - - if !*gtidBased { - // If all positions are not comparable, we can't do file based comparisons at all. - filePositionList := make([]mysql.Position, 0, len(replicationStatusMap)) - for _, p := range positionMap { - filePositionList = append(filePositionList, p) - } - if !mysql.AllPositionsComparable(filePositionList) { - return nil, fmt.Errorf("we can't compare all file based positions") + executedPosition, err := mysql.DecodePosition(masterStatus.Position) + if err != nil { + return nil, err } + positionMap[alias] = executedPosition } return positionMap, nil From fff559f99050d4eb126c113c093b919a35ceebc5 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Tue, 18 Aug 2020 23:36:49 -0700 Subject: [PATCH 21/26] Fixed unit tests and refactored logic per pairing session. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 5 +-- go/vt/wrangler/reparent.go | 35 +++++++------------ .../testlib/emergency_reparent_shard_test.go | 12 +++++-- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index 5bd0713fa2b..a2f1c21c7fa 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -119,16 +119,17 @@ func TestReparentDownMaster(t *testing.T) { require.Error(t, err) // Run forced reparent operation, this should now proceed unimpeded. - err = clusterInstance.VtctlclientProcess.ExecuteCommand( + out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( "EmergencyReparentShard", "-keyspace_shard", keyspaceShard, "-new_master", tablet62044.Alias, "-wait_replicas_timeout", "30s") + log.Infof("EmergencyReparentShard Output: %v", out) require.Nil(t, err) require.NoError(t, err) // Check that old master tablet is left around for human intervention. - out, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") + out, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Validate") require.Error(t, err) require.Contains(t, out, "already has master") diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 46bd5f107c5..d0b64cc9323 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -883,15 +883,6 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return err } - var newMasterTabletAliasStr string - if masterElectTabletAlias != nil { - newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) - _, ok := tabletMap[newMasterTabletAliasStr] - if !ok { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master-elect tablet %v is not in the shard", newMasterTabletAliasStr) - } - } - statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, ignoredTablets) if err != nil { return err @@ -910,14 +901,6 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") } - if masterElectTabletAlias != nil { - masterPos, ok := validCandidates[newMasterTabletAliasStr] - if !ok { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect is either not fully caught up, or has errant GTIDs") - } - validCandidates[newMasterTabletAliasStr] = masterPos - } - errChan := make(chan error) rec := &concurrency.AllErrorRecorder{} groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) @@ -926,11 +909,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events go func(alias string) { var err error defer func() { errChan <- err }() - err = wr.WaitForRelayLogsToApply(groupCtx, tabletMap[alias], statusMap[alias]) - if err != nil { - rec.RecordError(err) - } }(candidate) } @@ -938,10 +917,10 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events for waitErr := range errChan { resultCounter++ if waitErr != nil { + rec.RecordError(waitErr) groupCancel() } if resultCounter == len(validCandidates) { - groupCancel() break } } @@ -950,6 +929,7 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } var winningPosition mysql.Position + var newMasterTabletAliasStr string for alias, position := range validCandidates { if winningPosition.IsZero() { winningPosition = position @@ -962,6 +942,17 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events } } + if masterElectTabletAlias != nil { + newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) + masterPos, ok := validCandidates[newMasterTabletAliasStr] + if !ok { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", newMasterTabletAliasStr) + } + if !masterPos.AtLeast(winningPosition) { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect: %v at position %v, is not fully caught up. Winning position: %v", newMasterTabletAliasStr, masterPos, winningPosition) + } + } + // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index ee061108757..d7d2c43f830 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -123,6 +123,7 @@ func TestEmergencyReparentShard(t *testing.T) { goodReplica1.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ GTIDSet: goodReplica1RelayLogPos, } + goodReplica1.FakeMysqlDaemon.WaitMasterPosition = goodReplica1.FakeMysqlDaemon.CurrentMasterFilePosition goodReplica1.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", @@ -149,6 +150,7 @@ func TestEmergencyReparentShard(t *testing.T) { goodReplica2.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ GTIDSet: goodReplica2RelayLogPos, } + goodReplica2.FakeMysqlDaemon.WaitMasterPosition = goodReplica2.FakeMysqlDaemon.CurrentMasterFilePosition goodReplica2.FakeMysqlDaemon.SetMasterInput = topoproto.MysqlAddr(newMaster.Tablet) goodReplica2.StartActionLoop(t, wr) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -184,7 +186,9 @@ func TestEmergencyReparentShard(t *testing.T) { // TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent // to a host that is not the latest in replication position. func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -220,6 +224,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { newMaster.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ GTIDSet: newMasterRelayLogPos, } + newMaster.FakeMysqlDaemon.WaitMasterPosition = newMaster.FakeMysqlDaemon.CurrentMasterFilePosition newMaster.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", } @@ -257,6 +262,7 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { moreAdvancedReplica.FakeMysqlDaemon.CurrentMasterFilePosition = mysql.Position{ GTIDSet: moreAdvancedReplicaLogPos, } + moreAdvancedReplica.FakeMysqlDaemon.WaitMasterPosition = moreAdvancedReplica.FakeMysqlDaemon.CurrentMasterFilePosition moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE IO_THREAD", } @@ -265,8 +271,10 @@ func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { // run EmergencyReparentShard err := wr.EmergencyReparentShard(ctx, newMaster.Tablet.Keyspace, newMaster.Tablet.Shard, newMaster.Tablet.Alias, 10*time.Second, sets.NewString()) + cancel() + assert.Error(t, err) - assert.Contains(t, err.Error(), "master elect is either not fully caught up, or") + assert.Contains(t, err.Error(), "is not fully caught up") // check what was run err = newMaster.FakeMysqlDaemon.CheckSuperQueryList() require.NoError(t, err) From d239ff5cfd8b7e6c6090ff8b9b3cd9676b4fb3e8 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 20 Aug 2020 13:04:57 -0700 Subject: [PATCH 22/26] Ensure we use Wrapf for ALL error returns. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index d0b64cc9323..60cc9c45310 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -880,17 +880,17 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events event.DispatchUpdate(ev, "reading all tablets") tabletMap, err := wr.ts.GetTabletMapForShard(ctx, keyspace, shard) if err != nil { - return err + return vterrors.Wrapf(err, "failed to get tablet map for shard %v in keyspace %v: %v", shard, keyspace, err) } statusMap, masterStatusMap, err := wr.stopReplicationAndBuildStatusMaps(ctx, ev, tabletMap, waitReplicasTimeout, ignoredTablets) if err != nil { - return err + return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) } // Check we still have the topology lock. if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "lost topology lock, aborting: %v", err) + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) } validCandidates, err := wr.findValidReparentCandidates(statusMap, masterStatusMap) From f445f7978739698081cd737bdbe3bedcd4b23fdf Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 20 Aug 2020 13:08:46 -0700 Subject: [PATCH 23/26] Get rid of unnecessary subcontext. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 60cc9c45310..89e57cf763a 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1133,15 +1133,13 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e defer func() { errChan <- err }() wr.logger.Infof("getting replication position from %v", alias) - ctx, cancel := context.WithCancel(groupCtx) - defer cancel() var stopReplicationStatus *replicationdatapb.StopReplicationStatus - _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(ctx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) + _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) switch err { case mysql.ErrNotReplica: fmt.Printf("Found ErrNotReplica for alias: %v", alias) var masterStatus *replicationdatapb.MasterStatus - masterStatus, err = wr.tmc.DemoteMaster(ctx, tabletInfo.Tablet) + masterStatus, err = wr.tmc.DemoteMaster(groupCtx, tabletInfo.Tablet) if err != nil { wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) return From dcfe4f1e67ae840aeb99092967718aa3eb0f8c8f Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 20 Aug 2020 13:27:00 -0700 Subject: [PATCH 24/26] More error improvement. Signed-off-by: Peter Farr --- go/vt/wrangler/reparent.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 89e57cf763a..e9dc22a0eb5 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -1087,7 +1087,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio } relayLogGTIDSet, ok := status.RelayLogPosition.GTIDSet.(mysql.Mysql56GTIDSet) if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "We got a filled in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assessment") + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "we got a filled in relay log position, but it's not of type Mysql56GTIDSet, even though we've determined we need to use GTID based assessment") } errantGTIDs, err := status.FindErrantGTIDs(statusList) if err != nil { @@ -1109,7 +1109,7 @@ func (wr *Wrangler) findValidReparentCandidates(statusMap map[string]*replicatio for alias, masterStatus := range masterStatusMap { executedPosition, err := mysql.DecodePosition(masterStatus.Position) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "could not decode a master status executed position for tablet %v: %v", alias, err) } positionMap[alias] = executedPosition } @@ -1129,7 +1129,7 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer groupCancel() fillStatus := func(alias string, tabletInfo *topo.TabletInfo) { - err := fmt.Errorf("fillStatus did not successfully complete") + err := vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "fillStatus did not successfully complete") defer func() { errChan <- err }() wr.logger.Infof("getting replication position from %v", alias) @@ -1137,11 +1137,11 @@ func (wr *Wrangler) stopReplicationAndBuildStatusMaps(ctx context.Context, ev *e _, stopReplicationStatus, err = wr.tmc.StopReplicationAndGetStatus(groupCtx, tabletInfo.Tablet, replicationdatapb.StopReplicationMode_IOTHREADONLY) switch err { case mysql.ErrNotReplica: - fmt.Printf("Found ErrNotReplica for alias: %v", alias) var masterStatus *replicationdatapb.MasterStatus masterStatus, err = wr.tmc.DemoteMaster(groupCtx, tabletInfo.Tablet) if err != nil { wr.logger.Warningf("replica %v thinks it's master but we failed to demote it", alias) + err = vterrors.Wrapf(err, "replica %v thinks it's master but we failed to demote it: %v", alias, err) return } mu.Lock() From 04a1a607d638a9349d9801ba5ebcef17ba62fcee Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 20 Aug 2020 13:28:15 -0700 Subject: [PATCH 25/26] Rename per review suggestion. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index a2f1c21c7fa..c14643689d3 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -281,7 +281,7 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { killTablets(t) } -func TestReparentUnreachableReplicas(t *testing.T) { +func TestReparentIgnoreReplicas(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() From c3c24f5f12c12326477e5e8d68e81c096cdfb4b6 Mon Sep 17 00:00:00 2001 From: Peter Farr Date: Thu, 20 Aug 2020 13:39:19 -0700 Subject: [PATCH 26/26] Remove InitTablet calls. They are unnecessary now. Signed-off-by: Peter Farr --- go/test/endtoend/reparent/reparent_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index c14643689d3..bc5933e0636 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -78,10 +78,6 @@ func TestReparentDownMaster(t *testing.T) { // Reset status, don't wait for the tablet status. We will check it later tablet.VttabletProcess.ServingStatus = "" - // Init Tablet - err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) - require.NoError(t, err) - // Start the tablet err = tablet.VttabletProcess.Setup() require.NoError(t, err) @@ -184,9 +180,6 @@ func TestReparentNoChoiceDownMaster(t *testing.T) { // Reset status, don't wait for the tablet status. We will check it later tablet.VttabletProcess.ServingStatus = "" - // Init Tablet - err = clusterInstance.VtctlclientProcess.InitTablet(&tablet, tablet.Cell, keyspaceName, hostname, shardName) - require.NoError(t, err) // Start the tablet err = tablet.VttabletProcess.Setup()