diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index d894ffbc2d8..f57e905adc0 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -1352,7 +1352,7 @@ func testScheduler(t *testing.T) { onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled) }) - // now, we submit the exact same migratoin again: same UUID, same migration context. + // now, we submit the exact same migration again: same UUID, same migration context. t.Run("resubmit migration", func(t *testing.T) { executedUUID := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtctl", "", "", true)) // skip wait require.Equal(t, uuid, executedUUID) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 6d39b767775..386b1d89563 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -605,7 +605,7 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos replication.Positio } // terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration -func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error { +func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string, deleteEntry bool) error { tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) if err != nil { return err @@ -621,10 +621,26 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil { log.Errorf("FAIL vreplicationExec: uuid=%s, query=%v, error=%v", uuid, query, err) } + if deleteEntry { + if err := e.deleteVReplicationEntry(ctx, uuid); err != nil { + return err + } + } - if err := e.deleteVReplicationEntry(ctx, uuid); err != nil { + return nil +} + +func (e *Executor) startVReplication(ctx context.Context, tablet *topodatapb.Tablet, workflow string) (err error) { + query, err := sqlparser.ParseAndBind(sqlStartVReplStream, + sqltypes.StringBindVariable(e.dbName), + sqltypes.StringBindVariable(workflow), + ) + if err != nil { return err } + if _, err := e.vreplicationExec(ctx, tablet, query); err != nil { + return vterrors.Wrapf(err, "FAIL vreplicationExec: uuid=%s, query=%v", workflow, query) + } return nil } @@ -1071,6 +1087,16 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh } go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow) + defer func() { + if !renameWasSuccessful { + // Restarting vreplication + if err := e.startVReplication(ctx, tablet.Tablet, s.workflow); err != nil { + log.Errorf("cutOverVReplMigration %v: failed restarting vreplication after cutover failure: %v", s.workflow, err) + } + go log.Infof("cutOverVReplMigration %v: started vreplication after cutover failure", s.workflow) + } + }() + // rename tables atomically (remember, writes on source tables are stopped) { if isVreplicationTestSuite { @@ -1345,7 +1371,7 @@ func (e *Executor) initVreplicationRevertMigration(ctx context.Context, onlineDD // ExecuteWithVReplication sets up the grounds for a vreplication schema migration func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) error { // make sure there's no vreplication workflow running under same name - _ = e.terminateVReplMigration(ctx, onlineDDL.UUID) + _ = e.terminateVReplMigration(ctx, onlineDDL.UUID, true) if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { return ErrExecutorNotWritableTablet @@ -1506,7 +1532,7 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl // migration could have started by a different tablet. We need to actively verify if it is running s, _ := e.readVReplStream(ctx, onlineDDL.UUID, true) foundRunning = (s != nil && s.isRunning()) - if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil { + if err := e.terminateVReplMigration(ctx, onlineDDL.UUID, false); err != nil { return foundRunning, fmt.Errorf("Error terminating migration, vreplication exec error: %+v", err) } } @@ -3137,6 +3163,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i cancellable = append(cancellable, newCancellableMigration(uuid, s.message)) } if !s.isRunning() { + log.Infof("migration %s in 'running' state but vreplication state is '%s'", uuid, s.state.String()) return nil } // This VRepl migration may have started from outside this tablet, so @@ -4128,7 +4155,7 @@ func (e *Executor) ForceCutOverPendingMigrations(ctx context.Context) (result *s if err != nil { return result, err } - log.Infof("ForceCutOverPendingMigrations: iterating %v migrations %s", len(uuids)) + log.Infof("ForceCutOverPendingMigrations: iterating %v migrations", len(uuids)) result = &sqltypes.Result{} for _, uuid := range uuids {