Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 74 additions & 14 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reparentutil
import (
"context"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -150,47 +151,84 @@ func (erp *EmergencyReparenter) promoteNewPrimary(

event.DispatchUpdate(ev, "reparenting all tablets")

// (@ajm188) - A question while migrating: Is this by design? By my read,
// there's nothing consuming that error channel, meaning any replica that
// fails to SetMaster will actually block trying to send to the errCh. In
// addition, the only way an operator will ever notice these errors will be
// in the logs on the tablet, and not from any error propagation in
// vtctl/wrangler, so a shard will continue to attempt to serve (probably?)
// after a partially-failed ERS.
// Create a context and cancel function to watch for the first successful
// SetMaster call on a replica. We use a background context so that this
// context is only ever Done when its cancel is called by the background
// goroutine we're about to spin up.
//
// Similarly, create a context and cancel for the replica waiter goroutine
// to signal when all replica goroutines have finished. In the case where at
// least one replica succeeds, replSuccessCtx will be canceled first, while
// allReplicasDoneCtx is guaranteed to be canceled within
// opts.WaitReplicasTimeout plus some jitter.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. IIUC, the calls to SetMaster are bounded by WaitReplicasTimeout which guarantees that allReplicasDoneCancel is eventually called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely!

replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background())
allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background())

now := time.Now().UnixNano()
errCh := make(chan error)
replWg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}

handlePrimary := func(alias string, ti *topo.TabletInfo) error {
erp.logger.Infof("populating reparent journal on new master %v", alias)
return erp.tmc.PopulateReparentJournal(replCtx, ti.Tablet, now, opts.lockAction, newPrimaryTabletInfo.Alias, rp)
}

handleReplica := func(alias string, ti *topo.TabletInfo) {
defer replWg.Done()
erp.logger.Infof("setting new master on replica %v", alias)

var err error
defer func() { errCh <- err }()

forceStart := false
if status, ok := statusMap[alias]; ok {
forceStart = ReplicaWasRunning(status)
fs, err := ReplicaWasRunning(status)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err)
rec.RecordError(err)

return
}

forceStart = fs
}

err = erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart)
err := erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err)
rec.RecordError(err)

return
}

// Signal that at least one goroutine succeeded to SetMaster.
replSuccessCancel()
}

numReplicas := 0

for alias, ti := range tabletMap {
switch {
case alias == newPrimaryTabletAlias:
continue
case !opts.IgnoreReplicas.Has(alias):
replWg.Add(1)
numReplicas++
go handleReplica(alias, ti)
}
}

// Spin up a background goroutine to wait until all replica goroutines
// finished. Polling this way allows us to have promoteNewPrimary return
// success as soon as (a) the primary successfully populates its reparent
// journal and (b) at least one replica successfully begins replicating.
//
// If we were to follow the more common pattern of blocking on replWg.Wait()
// in the main body of promoteNewPrimary, we would be bound to the
// time of slowest replica, instead of the time of the fastest successful
// replica, and we want ERS to be fast.
go func() {
replWg.Wait()
allReplicasDoneCancel()
}()

primaryErr := handlePrimary(newPrimaryTabletAlias, newPrimaryTabletInfo)
if primaryErr != nil {
erp.logger.Warningf("master failed to PopulateReparentJournal")
Expand All @@ -199,7 +237,29 @@ func (erp *EmergencyReparenter) promoteNewPrimary(
return vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on master: %v", primaryErr)
}

return nil
select {
case <-replSuccessCtx.Done():
// At least one replica was able to SetMaster successfully
return nil
case <-allReplicasDoneCtx.Done():
// There are certain timing issues between replSuccessCtx.Done firing
// and allReplicasDoneCtx.Done firing, so we check again if truly all
// replicas failed (where `numReplicas` goroutines recorded an error) or
// one or more actually managed to succeed.
errCount := len(rec.Errors)

switch {
case errCount > numReplicas:
// Technically, rec.Errors should never be greater than numReplicas,
// but it's better to err on the side of caution here, but also
// we're going to be explicit that this is doubly unexpected.
return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error())
case errCount == numReplicas:
return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error())
default:
return nil
}
}
}

func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace string, shard string, opts EmergencyReparentOptions) error {
Expand Down
Loading