diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 5ba9e9b2d76..51dfddbe0fc 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -384,7 +384,10 @@ func testScheduler(t *testing.T) { ALTER TABLE t2_test ENGINE=InnoDB; ` instantAlterT1Statement = ` - ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0; + ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0 + ` + instantUndoAlterT1Statement = ` + ALTER TABLE t1_test DROP COLUMN i0 ` dropT1Statement = ` DROP TABLE IF EXISTS t1_test @@ -405,7 +408,7 @@ func testScheduler(t *testing.T) { ALTER TABLE nonexistent FORCE ` populateT1Statement = ` - insert into t1_test values (1, 'new_row') + insert ignore into t1_test values (1, 'new_row') ` ) @@ -798,6 +801,64 @@ func testScheduler(t *testing.T) { }) }) } + + if forceCutoverCapable { + t.Run("force_cutover_instant", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5) + defer cancel() + + t.Run("populate t1_test", func(t *testing.T) { + onlineddl.VtgateExecQuery(t, &vtParams, populateT1Statement, "") + }) + + commitTransactionChan := make(chan any) + transactionErrorChan := make(chan error) + t.Run("locking table rows", func(t *testing.T) { + go runInTransaction(t, ctx, primaryTablet, "select * from t1_test for update", commitTransactionChan, transactionErrorChan) + }) + + t.Run("execute migration", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, createParams(instantAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait + }) + t.Run("expect completion", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("check special_plan", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + specialPlan := row.AsString("special_plan", "") + assert.Contains(t, specialPlan, "instant-ddl") + } + }) + t.Run("expect transaction failure", func(t *testing.T) { + select { + case commitTransactionChan <- true: // good + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } + // Transaction will now attempt to commit. But we expect our "force_cutover" to have terminated + // the transaction's connection. + select { + case err := <-transactionErrorChan: + assert.ErrorContains(t, err, "broken pipe") + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } + }) + t.Run("cleanup: undo migration", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, createParams(instantUndoAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --force-cut-over-after=1ms", "vtgate", "", "", true)) // skip wait + }) + t.Run("cleanup: expect completion", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) + } + t.Run("ALTER both tables non-concurrent", func(t *testing.T) { t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 1165e9e05d1..9aaf08d5f72 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2482,6 +2482,65 @@ func (e *Executor) executeAlterViewOnline(ctx context.Context, onlineDDL *schema return nil } +// executeSpecialAlterDirectDDLActionMigration executes a special plan using a direct ALTER TABLE statement. +func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) { + + forceCutOverAfter, err := onlineDDL.StrategySetting().ForceCutOverAfter() + if err != nil { + return err + } + + bufferingCtx, bufferingContextCancel := context.WithCancel(ctx) + defer bufferingContextCancel() + + // Buffer queries while issuing the ALTER TABLE statement (we assume this ALTER is going to be quick, + // as in ALGORITHM=INSTANT or a quick partition operation) + toggleBuffering := func(bufferQueries bool) { + log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) + timeout := onlineDDL.CutOverThreshold + qrBufferExtraTimeout + + e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries) + if !bufferQueries { + // unbuffer existing queries: + bufferingContextCancel() + } + log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) + } + defer toggleBuffering(false) + toggleBuffering(true) + + // Give a fraction of a second for a scenario where a query is in + // query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries: + // they will be able to complete before the ALTER. + e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering") + time.Sleep(100 * time.Millisecond) + + if forceCutOverAfter > 0 { + // Irrespective of the --force-cut-over-after flag value, as long as it's nonzero, we now terminate + // connections adn transactions on the migrated table. + // --force-cut-over-after was designed to work with `vitess` migrations, that could cut-over multiple times, + // and was meant to set a limit to the overall duration of the attempts, for example 1 hour. + // With INSTANT DDL or other quick operations, this becomes meaningless. Once we begin the operation, there + // is no going back. We submit it to MySQL, and it takes however long it takes. + // In this particular function, we expect *very quick* operation. + // So we take --force-cut-over-after as a hint that we should force terminate connections and transactions. + // + // We should only proceed with forceful cut over if there is no pending atomic transaction for the table. + // This will help in keeping the atomicity guarantee of a prepared transaction. + if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil { + return vterrors.Wrapf(err, "checking prepared pool for table") + } + if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil { + return vterrors.Wrapf(err, "failed killing table lock holders and accessors") + } + } + + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + return err + } + return nil +} + // executeSpecialAlterDDLActionMigrationIfApplicable sees if the given migration can be executed via special execution path, that isn't a full blown online schema change process. func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context.Context, onlineDDL *schema.OnlineDDL) (specialMigrationExecuted bool, err error) { // Before we jump on to strategies... Some ALTERs can be optimized without having to run through @@ -2505,11 +2564,11 @@ func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context case instantDDLSpecialOperation: schemadiff.AddInstantAlgorithm(specialPlan.alterTable) onlineDDL.SQL = sqlparser.CanonicalString(specialPlan.alterTable) - if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil { return false, err } case rangePartitionSpecialOperation: - if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + if err := e.executeSpecialAlterDirectDDLActionMigration(ctx, onlineDDL); err != nil { return false, err } default: