diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 204ea4e4cc9..f7753c9c6ca 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -368,6 +368,8 @@ func TestSchemaChange(t *testing.T) { // no migrations pending at this time time.Sleep(10 * time.Second) onlineddl.CheckCancelAllMigrations(t, &vtParams, 0) + // Validate that invoking CANCEL ALL via vtctl works + onlineddl.CheckCancelAllMigrationsViaVtctl(t, &clusterInstance.VtctlclientProcess, keyspaceName) }) t.Run("cancel all migrations: some migrations to cancel", func(t *testing.T) { for i := range shards { @@ -387,6 +389,26 @@ func TestSchemaChange(t *testing.T) { wg.Wait() onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count) }) + t.Run("cancel all migrations: some migrations to cancel via vtctl", func(t *testing.T) { + for i := range shards { + throttleApp(shards[i].Vttablets[0], throttlerAppName) + defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + } + // spawn n migrations; cancel them via cancel-all + var wg sync.WaitGroup + count := 4 + for i := 0; i < count; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", providedUUID, "vtgate", "vrepl_col", false) + }() + } + wg.Wait() + // cancelling via vtctl does not return values. We CANCEL ALL via vtctl, then validate via VTGate that nothing remains to be cancelled. + onlineddl.CheckCancelAllMigrationsViaVtctl(t, &clusterInstance.VtctlclientProcess, keyspaceName) + onlineddl.CheckCancelAllMigrations(t, &vtParams, 0) + }) // reparent shard -80 to replica // and then reparent it back to original state diff --git a/go/test/endtoend/onlineddl/vtctlutil.go b/go/test/endtoend/onlineddl/vtctlutil.go new file mode 100644 index 00000000000..b98d1134283 --- /dev/null +++ b/go/test/endtoend/onlineddl/vtctlutil.go @@ -0,0 +1,33 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "testing" + + "vitess.io/vitess/go/test/endtoend/cluster" + + "github.com/stretchr/testify/assert" +) + +// CheckCancelAllMigrations cancels all pending migrations. There is no validation for affected migrations. +func CheckCancelAllMigrationsViaVtctl(t *testing.T, vtctlclient *cluster.VtctlClientProcess, keyspace string) { + cancelQuery := "alter vitess_migration cancel all" + + _, err := vtctlclient.ApplySchemaWithOutput(keyspace, cancelQuery, cluster.VtctlClientParams{SkipPreflight: true}) + assert.NoError(t, err) +} diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 4ca04e53917..f22f6abe508 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -149,7 +149,7 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error { // We ignore DATABASE-level DDLs here because detectBigSchemaChanges doesn't // look at them anyway. - parsedDDLs, _, _, err := exec.parseDDLs(sqls) + parsedDDLs, _, _, _, err := exec.parseDDLs(sqls) if err != nil { return err } @@ -162,14 +162,15 @@ func (exec *TabletExecutor) Validate(ctx context.Context, sqls []string) error { return err } -func (exec *TabletExecutor) parseDDLs(sqls []string) ([]sqlparser.DDLStatement, []sqlparser.DBDDLStatement, [](*sqlparser.RevertMigration), error) { +func (exec *TabletExecutor) parseDDLs(sqls []string) ([]sqlparser.DDLStatement, []sqlparser.DBDDLStatement, [](*sqlparser.RevertMigration), [](*sqlparser.AlterMigration), error) { parsedDDLs := make([]sqlparser.DDLStatement, 0) parsedDBDDLs := make([]sqlparser.DBDDLStatement, 0) revertStatements := make([](*sqlparser.RevertMigration), 0) + alterMigrationStatements := make([](*sqlparser.AlterMigration), 0) for _, sql := range sqls { stmt, err := sqlparser.Parse(sql) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to parse sql: %s, got error: %v", sql, err) + return nil, nil, nil, nil, fmt.Errorf("failed to parse sql: %s, got error: %v", sql, err) } switch stmt := stmt.(type) { case sqlparser.DDLStatement: @@ -178,13 +179,15 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) ([]sqlparser.DDLStatement, parsedDBDDLs = append(parsedDBDDLs, stmt) case *sqlparser.RevertMigration: revertStatements = append(revertStatements, stmt) + case *sqlparser.AlterMigration: + alterMigrationStatements = append(alterMigrationStatements, stmt) default: if len(exec.tablets) != 1 { - return nil, nil, nil, fmt.Errorf("non-ddl statements can only be executed for single shard keyspaces: %s", sql) + return nil, nil, nil, nil, fmt.Errorf("non-ddl statements can only be executed for single shard keyspaces: %s", sql) } } } - return parsedDDLs, parsedDBDDLs, revertStatements, nil + return parsedDDLs, parsedDBDDLs, revertStatements, alterMigrationStatements, nil } // IsOnlineSchemaDDL returns true if we expect to run a online schema change DDL