diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter.go b/go/vt/vtctl/reparentutil/emergency_reparenter.go index 6d5716b1f12..1ad55715985 100644 --- a/go/vt/vtctl/reparentutil/emergency_reparenter.go +++ b/go/vt/vtctl/reparentutil/emergency_reparenter.go @@ -19,6 +19,7 @@ package reparentutil import ( "context" "fmt" + "sync" "time" "k8s.io/apimachinery/pkg/util/sets" @@ -150,15 +151,22 @@ func (erp *EmergencyReparenter) promoteNewPrimary( event.DispatchUpdate(ev, "reparenting all tablets") - // (@ajm188) - A question while migrating: Is this by design? By my read, - // there's nothing consuming that error channel, meaning any replica that - // fails to SetMaster will actually block trying to send to the errCh. In - // addition, the only way an operator will ever notice these errors will be - // in the logs on the tablet, and not from any error propagation in - // vtctl/wrangler, so a shard will continue to attempt to serve (probably?) - // after a partially-failed ERS. + // Create a context and cancel function to watch for the first successful + // SetMaster call on a replica. We use a background context so that this + // context is only ever Done when its cancel is called by the background + // goroutine we're about to spin up. + // + // Similarly, create a context and cancel for the replica waiter goroutine + // to signal when all replica goroutines have finished. In the case where at + // least one replica succeeds, replSuccessCtx will be canceled first, while + // allReplicasDoneCtx is guaranteed to be canceled within + // opts.WaitReplicasTimeout plus some jitter. + replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background()) + allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background()) + now := time.Now().UnixNano() - errCh := make(chan error) + replWg := sync.WaitGroup{} + rec := concurrency.AllErrorRecorder{} handlePrimary := func(alias string, ti *topo.TabletInfo) error { erp.logger.Infof("populating reparent journal on new master %v", alias) @@ -166,31 +174,61 @@ func (erp *EmergencyReparenter) promoteNewPrimary( } handleReplica := func(alias string, ti *topo.TabletInfo) { + defer replWg.Done() erp.logger.Infof("setting new master on replica %v", alias) - var err error - defer func() { errCh <- err }() - forceStart := false if status, ok := statusMap[alias]; ok { - forceStart = ReplicaWasRunning(status) + fs, err := ReplicaWasRunning(status) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err) + rec.RecordError(err) + + return + } + + forceStart = fs } - err = erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart) + err := erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart) if err != nil { err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) + rec.RecordError(err) + + return } + + // Signal that at least one goroutine succeeded to SetMaster. + replSuccessCancel() } + numReplicas := 0 + for alias, ti := range tabletMap { switch { case alias == newPrimaryTabletAlias: continue case !opts.IgnoreReplicas.Has(alias): + replWg.Add(1) + numReplicas++ go handleReplica(alias, ti) } } + // Spin up a background goroutine to wait until all replica goroutines + // finished. Polling this way allows us to have promoteNewPrimary return + // success as soon as (a) the primary successfully populates its reparent + // journal and (b) at least one replica successfully begins replicating. + // + // If we were to follow the more common pattern of blocking on replWg.Wait() + // in the main body of promoteNewPrimary, we would be bound to the + // time of slowest replica, instead of the time of the fastest successful + // replica, and we want ERS to be fast. + go func() { + replWg.Wait() + allReplicasDoneCancel() + }() + primaryErr := handlePrimary(newPrimaryTabletAlias, newPrimaryTabletInfo) if primaryErr != nil { erp.logger.Warningf("master failed to PopulateReparentJournal") @@ -199,7 +237,29 @@ func (erp *EmergencyReparenter) promoteNewPrimary( return vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on master: %v", primaryErr) } - return nil + select { + case <-replSuccessCtx.Done(): + // At least one replica was able to SetMaster successfully + return nil + case <-allReplicasDoneCtx.Done(): + // There are certain timing issues between replSuccessCtx.Done firing + // and allReplicasDoneCtx.Done firing, so we check again if truly all + // replicas failed (where `numReplicas` goroutines recorded an error) or + // one or more actually managed to succeed. + errCount := len(rec.Errors) + + switch { + case errCount > numReplicas: + // Technically, rec.Errors should never be greater than numReplicas, + // but it's better to err on the side of caution here, but also + // we're going to be explicit that this is doubly unexpected. + return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error()) + case errCount == numReplicas: + return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error()) + default: + return nil + } + } } func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace string, shard string, opts EmergencyReparentOptions) error { diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go index d47cee02f38..d948c47b2c9 100644 --- a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go +++ b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go @@ -142,6 +142,10 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { Error: nil, }, }, + SetMasterResults: map[string]error{ + "zone1-0000000100": nil, + "zone1-0000000101": nil, + }, StopReplicationAndGetStatusResults: map[string]struct { Status *replicationdatapb.Status StopStatus *replicationdatapb.StopReplicationStatus @@ -149,6 +153,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }{ "zone1-0000000100": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", @@ -157,6 +162,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }, "zone1-0000000101": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", @@ -165,6 +171,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }, "zone1-0000000102": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-26", @@ -240,6 +247,10 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { Error: nil, }, }, + SetMasterResults: map[string]error{ + "zone1-0000000100": nil, + "zone1-0000000102": nil, + }, StopReplicationAndGetStatusResults: map[string]struct { Status *replicationdatapb.Status StopStatus *replicationdatapb.StopReplicationStatus @@ -247,6 +258,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }{ "zone1-0000000100": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", @@ -255,6 +267,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }, "zone1-0000000101": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", @@ -263,6 +276,7 @@ func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { }, "zone1-0000000102": { StopStatus: &replicationdatapb.StopReplicationStatus{ + Before: &replicationdatapb.Status{}, After: &replicationdatapb.Status{ MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", @@ -1003,7 +1017,7 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { }, }, }, - "zone1-00000000102": { + "zone1-0000000102": { Tablet: &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: "zone1", @@ -1012,7 +1026,7 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { Hostname: "requires force start", }, }, - "zone1-00000000404": { + "zone1-0000000404": { Tablet: &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: "zone1", @@ -1052,6 +1066,7 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { "zone1-0000000100": {}, "zone1-0000000101": {}, }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, opts: EmergencyReparentOptions{}, shouldErr: true, }, @@ -1174,7 +1189,7 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { shouldErr: true, }, { - name: "replicas failing to SetMaster does not fail the promotion", + name: "all replicas failing to SetMaster does fail the promotion", ts: memorytopo.NewServer("zone1"), tmc: &emergencyReparenterTestTMClient{ PopulateReparentJournalResults: map[string]error{ @@ -1189,7 +1204,7 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { }, }, SetMasterResults: map[string]error{ - // everyone fails, who cares?! + // everyone fails, we all fail "zone1-0000000101": assert.AnError, "zone1-0000000102": assert.AnError, }, @@ -1225,6 +1240,118 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { }, statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "all replicas slow to SetMaster does fail the promotion", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + SetMasterDelays: map[string]time.Duration{ + // nothing is failing, we're just slow + "zone1-0000000101": time.Millisecond * 100, + "zone1-0000000102": time.Millisecond * 75, + }, + SetMasterResults: map[string]error{ + "zone1-0000000101": nil, + "zone1-0000000102": nil, + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + "zone1-0000000102": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, + opts: EmergencyReparentOptions{ + WaitReplicasTimeout: time.Millisecond * 10, + }, + shouldErr: true, + }, + { + name: "one replica failing to SetMaster does not fail the promotion", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + SetMasterResults: map[string]error{ + "zone1-0000000101": nil, // this one succeeds, so we're good + "zone1-0000000102": assert.AnError, + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + "zone1-0000000102": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, shouldErr: false, }, } @@ -1235,10 +1362,6 @@ func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - if tt.ts == nil { - t.Skip("toposerver is nil, assuming we're working on other test cases") - } - ctx := context.Background() logger := logutil.NewMemoryLogger() ev := &events.Reparent{} @@ -1535,6 +1658,8 @@ type emergencyReparenterTestTMClient struct { Error error } // keyed by tablet alias. + SetMasterDelays map[string]time.Duration + // keyed by tablet alias. SetMasterResults map[string]error // keyed by tablet alias. StopReplicationAndGetStatusDelays map[string]time.Duration @@ -1583,6 +1708,18 @@ func (fake *emergencyReparenterTestTMClient) SetMaster(ctx context.Context, tabl } key := topoproto.TabletAliasString(tablet.Alias) + + if fake.SetMasterDelays != nil { + if delay, ok := fake.SetMasterDelays[key]; ok { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // proceed to results + } + } + } + if result, ok := fake.SetMasterResults[key]; ok { return result } diff --git a/go/vt/vtctl/reparentutil/replication.go b/go/vt/vtctl/reparentutil/replication.go index 4644b0b6246..42440b82918 100644 --- a/go/vt/vtctl/reparentutil/replication.go +++ b/go/vt/vtctl/reparentutil/replication.go @@ -140,9 +140,14 @@ func FindValidEmergencyReparentCandidates( } // ReplicaWasRunning returns true if a StopReplicationStatus indicates that the -// replica had running replication threads before being stopped. -func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) bool { - return stopStatus.Before.IoThreadRunning || stopStatus.Before.SqlThreadRunning +// replica had running replication threads before being stopped. It returns an +// error if the Before state of replication is nil. +func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) (bool, error) { + if stopStatus == nil || stopStatus.Before == nil { + return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopStatus) + } + + return stopStatus.Before.IoThreadRunning || stopStatus.Before.SqlThreadRunning, nil } // StopReplicationAndBuildStatusMaps stops replication on all replicas, then diff --git a/go/vt/vtctl/reparentutil/replication_test.go b/go/vt/vtctl/reparentutil/replication_test.go index 0b8c3de6997..12a045c60dd 100644 --- a/go/vt/vtctl/reparentutil/replication_test.go +++ b/go/vt/vtctl/reparentutil/replication_test.go @@ -716,9 +716,10 @@ func TestReplicaWasRunning(t *testing.T) { t.Parallel() tests := []struct { - name string - in *replicationdatapb.StopReplicationStatus - expected bool + name string + in *replicationdatapb.StopReplicationStatus + expected bool + shouldErr bool }{ { name: "io thread running", @@ -728,7 +729,8 @@ func TestReplicaWasRunning(t *testing.T) { SqlThreadRunning: false, }, }, - expected: true, + expected: true, + shouldErr: false, }, { name: "sql thread running", @@ -738,7 +740,8 @@ func TestReplicaWasRunning(t *testing.T) { SqlThreadRunning: true, }, }, - expected: true, + expected: true, + shouldErr: false, }, { name: "io and sql threads running", @@ -748,7 +751,8 @@ func TestReplicaWasRunning(t *testing.T) { SqlThreadRunning: true, }, }, - expected: true, + expected: true, + shouldErr: false, }, { name: "no replication threads running", @@ -758,7 +762,22 @@ func TestReplicaWasRunning(t *testing.T) { SqlThreadRunning: false, }, }, - expected: false, + expected: false, + shouldErr: false, + }, + { + name: "passing nil pointer results in an error", + in: nil, + expected: false, + shouldErr: true, + }, + { + name: "status.Before is nil results in an error", + in: &replicationdatapb.StopReplicationStatus{ + Before: nil, + }, + expected: false, + shouldErr: true, }, } @@ -768,7 +787,14 @@ func TestReplicaWasRunning(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - actual := ReplicaWasRunning(tt.in) + actual, err := ReplicaWasRunning(tt.in) + if tt.shouldErr { + assert.Error(t, err) + + return + } + + assert.NoError(t, err) assert.Equal(t, tt.expected, actual) }) }