Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})

// now, we submit the exact same migratoin again: same UUID, same migration context.
// now, we submit the exact same migration again: same UUID, same migration context.
t.Run("resubmit migration", func(t *testing.T) {
executedUUID := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtctl", "", "", true)) // skip wait
require.Equal(t, uuid, executedUUID)
Expand Down
37 changes: 32 additions & 5 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos replication.Positio
}

// terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error {
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string, deleteEntry bool) error {
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
Expand All @@ -614,10 +614,26 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err
if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil {
log.Errorf("FAIL vreplicationExec: uuid=%s, query=%v, error=%v", uuid, query, err)
}
if deleteEntry {
if err := e.deleteVReplicationEntry(ctx, uuid); err != nil {
return err
}
}

if err := e.deleteVReplicationEntry(ctx, uuid); err != nil {
return nil
}

func (e *Executor) startVReplication(ctx context.Context, tablet *topodatapb.Tablet, workflow string) (err error) {
query, err := sqlparser.ParseAndBind(sqlStartVReplStream,
sqltypes.StringBindVariable(e.dbName),
sqltypes.StringBindVariable(workflow),
)
if err != nil {
return err
}
if _, err := e.vreplicationExec(ctx, tablet, query); err != nil {
return vterrors.Wrapf(err, "FAIL vreplicationExec: uuid=%s, query=%v", workflow, query)
}
return nil
}

Expand Down Expand Up @@ -1064,6 +1080,16 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}
go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow)

defer func() {
if !renameWasSuccessful {
// Restarting vreplication
if err := e.startVReplication(ctx, tablet.Tablet, s.workflow); err != nil {
log.Errorf("cutOverVReplMigration %v: failed restarting vreplication after cutover failure: %v", s.workflow, err)
}
go log.Infof("cutOverVReplMigration %v: started vreplication after cutover failure", s.workflow)
}
}()

// rename tables atomically (remember, writes on source tables are stopped)
{
if isVreplicationTestSuite {
Expand Down Expand Up @@ -1338,7 +1364,7 @@ func (e *Executor) initVreplicationRevertMigration(ctx context.Context, onlineDD
// ExecuteWithVReplication sets up the grounds for a vreplication schema migration
func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) error {
// make sure there's no vreplication workflow running under same name
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID)
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID, true)

if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return ErrExecutorNotWritableTablet
Expand Down Expand Up @@ -1499,7 +1525,7 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
// migration could have started by a different tablet. We need to actively verify if it is running
s, _ := e.readVReplStream(ctx, onlineDDL.UUID, true)
foundRunning = (s != nil && s.isRunning())
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil {
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID, false); err != nil {
return foundRunning, fmt.Errorf("Error terminating migration, vreplication exec error: %+v", err)
}
}
Expand Down Expand Up @@ -3071,6 +3097,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
cancellable = append(cancellable, newCancellableMigration(uuid, s.message))
}
if !s.isRunning() {
log.Infof("migration %s in 'running' state but vreplication state is '%s'", uuid, s.state.String())
return nil
}
// This VRepl migration may have started from outside this tablet, so
Expand Down Expand Up @@ -4020,7 +4047,7 @@ func (e *Executor) ForceCutOverPendingMigrations(ctx context.Context) (result *s
if err != nil {
return result, err
}
log.Infof("ForceCutOverPendingMigrations: iterating %v migrations %s", len(uuids))
log.Infof("ForceCutOverPendingMigrations: iterating %v migrations", len(uuids))

result = &sqltypes.Result{}
for _, uuid := range uuids {
Expand Down
Loading