Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 73 additions & 8 deletions go/vt/wrangler/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,50 +976,108 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout)
defer replCancel()

replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background())
allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background())

// Reset replication on all replicas to point to the new master, and
// insert test row in the new master.
// Go through all the tablets:
// - new master: populate the reparent journal
// - everybody else: reparent to new master, wait for row
event.DispatchUpdate(ev, "reparenting all tablets")
now := time.Now().UnixNano()
errChan = make(chan error)
replWg := sync.WaitGroup{}
// we will reuse the concurrency.AllErrorRecorder for the actual reparent
// starting here because we've validated above that there were zero errors
// up to this point

handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error {
wr.logger.Infof("populating reparent journal on new master %v", alias)
return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp)
}
handleReplica := func(alias string, tabletInfo *topo.TabletInfo) {
var err error
defer func() { errChan <- err }()
defer replWg.Done()

wr.logger.Infof("setting new master on replica %v", alias)
forceStart := false
if status, ok := statusMap[alias]; ok {
forceStart = replicaWasRunning(status)
fs, err := replicaWasRunning(status)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err)
rec.RecordError(err)

return
}

forceStart = fs
}
err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart)
err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err)
rec.RecordError(err)
return
}

// Signal that at least one goroutine succeeded to SetMaster.
replSuccessCancel()
}

numReplicas := 0

for alias, tabletInfo := range tabletMap {
if alias == newMasterTabletAliasStr {
continue
} else if !ignoredTablets.Has(alias) {
replWg.Add(1)
numReplicas++
go handleReplica(alias, tabletInfo)
}
}

// Spin up a background goroutine to wait until all replica goroutines
// finished. Polling this way allows us to have promoteNewPrimary return
// success as soon as (a) the primary successfully populates its reparent
// journal and (b) at least one replica successfully begins replicating.
//
// If we were to follow the more common pattern of blocking on replWg.Wait()
// in the main body of promoteNewPrimary, we would be bound to the
// time of slowest replica, instead of the time of the fastest successful
// replica, and we want ERS to be fast.
go func() {
replWg.Wait()
allReplicasDoneCancel()
}()

masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr])
if masterErr != nil {
wr.logger.Warningf("master failed to PopulateReparentJournal")
replCancel()
return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr)
}

return nil
select {
case <-replSuccessCtx.Done():
// At least one replica was able to SetMaster successfully
return nil
case <-allReplicasDoneCtx.Done():
// There are certain timing issues between replSuccessCtx.Done firing
// and allReplicasDoneCtx.Done firing, so we check again if truly all
// replicas failed (where `numReplicas` goroutines recorded an error) or
// one or more actually managed to succeed.
errCount := len(rec.Errors)

switch {
case errCount > numReplicas:
// Technically, rec.Errors should never be greater than numReplicas,
// but it's better to err on the side of caution here, but also
// we're going to be explicit that this is doubly unexpected.
return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error())
case errCount == numReplicas:
return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error())
default:
return nil
}
}
}

// waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded,
Expand Down Expand Up @@ -1233,6 +1291,13 @@ func (wr *Wrangler) TabletExternallyReparented(ctx context.Context, newMasterAli
return nil
}

func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) bool {
return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning
// replicaWasRunning returns true if a StopReplicationStatus indicates that the
// replica had running replication threads before being stopped. It returns an
// error if the Before state of replication is nil.
func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) (bool, error) {
if stopReplicationStatus == nil || stopReplicationStatus.Before == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopReplicationStatus)
}

return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning, nil
}