Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,23 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
require.NoError(t, err)
}

// TestERSWithWriteInPromoteReplica tests that ERS doesn't fail even if there is a
// write that happens when PromoteReplica is called.
func TestERSWithWriteInPromoteReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Drop a table so that when sidecardb changes are checked, we run a DML query.
utils.RunSQLs(context.Background(), t, []string{
"set sql_log_bin=0",
`SET @@global.super_read_only=0`,
`DROP TABLE _vt.heartbeat`,
"set sql_log_bin=1",
}, tablets[3])
_, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, "ERS should not fail even if there is a sidecardb change")
}
10 changes: 6 additions & 4 deletions go/vt/vtctl/grpcvtctldserver/server_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func TestEmergencyReparentShardSlow(t *testing.T) {
},
},
},
PopulateReparentJournalDelays: map[string]time.Duration{
"zone1-0000000200": time.Second * 29,
SetReplicationSourceDelays: map[string]time.Duration{
"zone1-0000000100": time.Second * 29,
"zone1-0000000101": time.Second * 29,
},
PopulateReparentJournalResults: map[string]error{
"zone1-0000000200": nil,
Expand Down Expand Up @@ -224,8 +225,9 @@ func TestEmergencyReparentShardSlow(t *testing.T) {
},
},
},
PopulateReparentJournalDelays: map[string]time.Duration{
"zone1-0000000200": time.Second * 31,
SetReplicationSourceDelays: map[string]time.Duration{
"zone1-0000000100": time.Second * 31,
"zone1-0000000101": time.Second * 31,
},
PopulateReparentJournalResults: map[string]error{
"zone1-0000000200": nil,
Expand Down
75 changes: 31 additions & 44 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
// Since the new primary tablet belongs to the validCandidateTablets list, we no longer need any additional constraint checks

// Final step is to promote our primary candidate
err = erp.promoteNewPrimary(ctx, ev, newPrimary, opts, tabletMap, stoppedReplicationSnapshot.statusMap)
_, err = erp.reparentReplicas(ctx, ev, newPrimary, tabletMap, stoppedReplicationSnapshot.statusMap, opts, false /* intermediateReparent */)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func (erp *EmergencyReparenter) promoteIntermediateSource(

// we reparent all the other valid tablets to start replication from our new source
// we wait for all the replicas so that we can choose a better candidate from the ones that started replication later
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, validTabletMap, statusMap, opts, true /* waitForAllReplicas */, false /* populateReparentJournal */)
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, validTabletMap, statusMap, opts, true /* intermediateReparent */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -487,8 +487,10 @@ func (erp *EmergencyReparenter) reparentReplicas(
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
opts EmergencyReparentOptions,
waitForAllReplicas bool,
populateReparentJournal bool,
intermediateReparent bool, // intermediateReparent represents whether the reparenting of the replicas is the final reparent or not.
// Since ERS can sometimes promote a tablet, which isn't a candidate for promotion, if it is the most advanced, we don't want to
// call PromoteReplica on it. We just want to get all replicas to replicate from it to get caught up, after which we'll promote the primary
// candidate separately. During the final promotion, we call `PromoteReplica` and `PopulateReparentJournal`.
) ([]*topodatapb.Tablet, error) {

var (
Expand All @@ -497,6 +499,8 @@ func (erp *EmergencyReparenter) reparentReplicas(
)

replCtx, replCancel := context.WithTimeout(context.Background(), opts.WaitReplicasTimeout)
primaryCtx, primaryCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer primaryCancel()

event.DispatchUpdate(ev, "reparenting all tablets")

Expand All @@ -518,13 +522,26 @@ func (erp *EmergencyReparenter) reparentReplicas(
rec := concurrency.AllErrorRecorder{}

handlePrimary := func(alias string, tablet *topodatapb.Tablet) error {
position, err := erp.tmc.PrimaryPosition(replCtx, tablet)
if err != nil {
return err
}
if populateReparentJournal {
if !intermediateReparent {
var position string
var err error
if ev.ShardInfo.PrimaryAlias == nil {
erp.logger.Infof("setting up %v as new primary for an uninitialized cluster", alias)
// we call InitPrimary when the PrimaryAlias in the ShardInfo is empty. This happens when we have an uninitialized cluster.
position, err = erp.tmc.InitPrimary(primaryCtx, tablet, SemiSyncAckers(opts.durability, tablet) > 0)
} else {
erp.logger.Infof("starting promotion for the new primary - %v", alias)
// we call PromoteReplica which changes the tablet type, fixes the semi-sync, set the primary to read-write and flushes the binlogs
position, err = erp.tmc.PromoteReplica(primaryCtx, tablet, SemiSyncAckers(opts.durability, tablet) > 0)
}
if err != nil {
return vterrors.Wrapf(err, "primary-elect tablet %v failed to be upgraded to primary: %v", alias, err)
}
erp.logger.Infof("populating reparent journal on new primary %v", alias)
return erp.tmc.PopulateReparentJournal(replCtx, tablet, now, opts.lockAction, newPrimaryTablet.Alias, position)
err = erp.tmc.PopulateReparentJournal(primaryCtx, tablet, now, opts.lockAction, tablet.Alias, position)
if err != nil {
return vterrors.Wrapf(err, "failed to PopulateReparentJournal on primary: %v", err)
}
}
return nil
}
Expand Down Expand Up @@ -559,8 +576,8 @@ func (erp *EmergencyReparenter) reparentReplicas(
replicaMutex.Unlock()

// Signal that at least one goroutine succeeded to SetReplicationSource.
// We do this only when we do not want to wait for all the replicas
if !waitForAllReplicas {
// We do this only when we do not want to wait for all the replicas.
if !intermediateReparent {
replSuccessCancel()
}
}
Expand Down Expand Up @@ -594,10 +611,10 @@ func (erp *EmergencyReparenter) reparentReplicas(

primaryErr := handlePrimary(topoproto.TabletAliasString(newPrimaryTablet.Alias), newPrimaryTablet)
if primaryErr != nil {
erp.logger.Warningf("primary failed to PopulateReparentJournal")
erp.logger.Errorf("failed to promote %s to primary", topoproto.TabletAliasString(newPrimaryTablet.Alias))
replCancel()

return nil, vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on primary: %v", primaryErr)
return nil, vterrors.Wrapf(primaryErr, "failed to promote %v to primary", topoproto.TabletAliasString(newPrimaryTablet.Alias))
}

// We should only cancel the context that all the replicas are using when they are done.
Expand Down Expand Up @@ -709,36 +726,6 @@ func (erp *EmergencyReparenter) identifyPrimaryCandidate(
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unreachable - did not find a valid primary candidate even though the valid candidate list was non-empty")
}

func (erp *EmergencyReparenter) promoteNewPrimary(
ctx context.Context,
ev *events.Reparent,
newPrimary *topodatapb.Tablet,
opts EmergencyReparentOptions,
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
) error {
var err error
if ev.ShardInfo.PrimaryAlias == nil {
erp.logger.Infof("setting up %v as new primary for an uninitialized cluster", newPrimary.Alias)
// we call InitPrimary when the PrimaryAlias in the ShardInfo is empty. This happens when we have an uninitialized cluster.
_, err = erp.tmc.InitPrimary(ctx, newPrimary, SemiSyncAckers(opts.durability, newPrimary) > 0)
} else {
erp.logger.Infof("starting promotion for the new primary - %v", newPrimary.Alias)
// we call PromoteReplica which changes the tablet type, fixes the semi-sync, set the primary to read-write and flushes the binlogs
_, err = erp.tmc.PromoteReplica(ctx, newPrimary, SemiSyncAckers(opts.durability, newPrimary) > 0)
}
if err != nil {
return vterrors.Wrapf(err, "primary-elect tablet %v failed to be upgraded to primary: %v", newPrimary.Alias, err)
}
// we now reparent all the replicas to the new primary we have promoted.
// Here we do not need to wait for all the replicas, We can finish early when even 1 succeeds.
_, err = erp.reparentReplicas(ctx, ev, newPrimary, tabletMap, statusMap, opts, false /* waitForAllReplicas */, true /* populateReparentJournal */)
if err != nil {
return err
}
return nil
}

// filterValidCandidates filters valid tablets, keeping only the ones which can successfully be promoted without any constraint failures and can make forward progress on being promoted
func (erp *EmergencyReparenter) filterValidCandidates(validTablets []*topodatapb.Tablet, tabletsReachable []*topodatapb.Tablet, prevPrimary *topodatapb.Tablet, opts EmergencyReparentOptions) ([]*topodatapb.Tablet, error) {
var restrictedValidTablets []*topodatapb.Tablet
Expand Down
Loading