Skip to content
66 changes: 54 additions & 12 deletions go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams

hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
onlineDDLStrategy = "online -singleton"
createStatement = `
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
onlineSingletonDDLStrategy = "online -singleton"
onlineSingletonContextDDLStrategy = "online -singleton-context"
createStatement = `
CREATE TABLE stress_test (
id bigint(20) not null,
rand_val varchar(32) null default '',
Expand All @@ -61,13 +62,19 @@ var (
alterTableThrottlingStatement = `
ALTER TABLE stress_test DROP COLUMN created_timestamp
`
multiAlterTableThrottlingStatement = `
ALTER TABLE stress_test ENGINE=InnoDB;
ALTER TABLE stress_test ENGINE=InnoDB;
ALTER TABLE stress_test ENGINE=InnoDB;
`
// A trivial statement which must succeed and does not change the schema
alterTableTrivialStatement = `
ALTER TABLE stress_test ENGINE=InnoDB
`
dropStatement = `
DROP TABLE stress_test
`
multiDropStatements = `DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3;`
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -145,7 +152,7 @@ func TestSchemaChange(t *testing.T) {
// CREATE
t.Run("CREATE TABLE", func(t *testing.T) {
// The table does not exist
uuid := testOnlineDDLStatement(t, createStatement, onlineDDLStrategy, "vtgate", "", "", false)
uuid := testOnlineDDLStatement(t, createStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, tableName, true)
Expand Down Expand Up @@ -202,7 +209,7 @@ func TestSchemaChange(t *testing.T) {
})

t.Run("successful online alter, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineDDLStrategy, "vtgate", "hint_col", "", false)
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineSingletonDDLStrategy, "vtgate", "hint_col", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
Expand All @@ -217,10 +224,45 @@ func TestSchemaChange(t *testing.T) {
checkTable(t, tableName, true)
})

var throttledUUIDs []string
// singleton-context
t.Run("throttled migrations, singleton-context", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "", false)
throttledUUIDs = strings.Split(uuidList, "\n")
assert.Equal(t, 3, len(throttledUUIDs))
for _, uuid := range throttledUUIDs {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued)
}
})
t.Run("failed migrations, singleton-context", func(t *testing.T) {
_ = testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "rejected", false)
})
t.Run("terminate throttled migrations", func(t *testing.T) {
for _, uuid := range throttledUUIDs {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
}
time.Sleep(2 * time.Second)
for _, uuid := range throttledUUIDs {
uuid = strings.TrimSpace(uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
}
})

t.Run("successful multiple statement, singleton-context, vtctl", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, multiDropStatements, onlineSingletonContextDDLStrategy, "vtctl", "", "", false)
uuidSlice := strings.Split(uuidList, "\n")
assert.Equal(t, 3, len(uuidSlice))
for _, uuid := range uuidSlice {
uuid = strings.TrimSpace(uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})

//DROP

t.Run("online DROP TABLE", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, dropStatement, onlineDDLStrategy, "vtgate", "", "", false)
uuid := testOnlineDDLStatement(t, dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, tableName, false)
Expand Down Expand Up @@ -283,15 +325,15 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
func testRevertMigration(t *testing.T, revertUUID string, executeStrategy string, expectError string, skipWait bool) (uuid string) {
revertQuery := fmt.Sprintf("revert vitess_migration '%s'", revertUUID)
if executeStrategy == "vtgate" {
result := onlineddl.VtgateExecDDL(t, &vtParams, onlineDDLStrategy, revertQuery, expectError)
result := onlineddl.VtgateExecDDL(t, &vtParams, onlineSingletonDDLStrategy, revertQuery, expectError)
if result != nil {
row := result.Named().Row()
if row != nil {
uuid = row.AsString("uuid", "")
}
}
} else {
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineDDLStrategy, SkipPreflight: true})
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineSingletonDDLStrategy, SkipPreflight: true})
if expectError == "" {
assert.NoError(t, err)
uuid = output
Expand Down
12 changes: 9 additions & 3 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,22 @@ func CheckCancelAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatus schema.OnlineDDLStatus) {
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) {
showQuery := fmt.Sprintf("show vitess_migrations like '%s'", uuid)
r := VtgateExecQuery(t, vtParams, showQuery, "")
fmt.Printf("# output for `%s`:\n", showQuery)
PrintQueryResult(os.Stdout, r)

count := 0
for _, row := range r.Named().Rows {
if row["migration_uuid"].ToString() == uuid && row["migration_status"].ToString() == string(expectStatus) {
count++
if row["migration_uuid"].ToString() != uuid {
continue
}
for _, expectStatus := range expectStatuses {
if row["migration_status"].ToString() == string(expectStatus) {
count++
break
}
}
}
assert.Equal(t, len(shards), count)
Expand Down
15 changes: 11 additions & 4 deletions go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (
)

const (
declarativeFlag = "declarative"
skipTopoFlag = "skip-topo"
singletonFlag = "singleton"
declarativeFlag = "declarative"
skipTopoFlag = "skip-topo"
singletonFlag = "singleton"
singletonContextFlag = "singleton-context"
)

// DDLStrategy suggests how an ALTER TABLE should run (e.g. "direct", "online", "gh-ost" or "pt-osc")
Expand Down Expand Up @@ -123,6 +124,11 @@ func (setting *DDLStrategySetting) IsSingleton() bool {
return setting.hasFlag(singletonFlag)
}

// IsSingletonContext checks if strategy options include -singleton-context
func (setting *DDLStrategySetting) IsSingletonContext() bool {
return setting.hasFlag(singletonContextFlag)
}

// RuntimeOptions returns the options used as runtime flags for given strategy, removing any internal hint options
func (setting *DDLStrategySetting) RuntimeOptions() []string {
opts, _ := shlex.Split(setting.Options)
Expand All @@ -132,6 +138,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
case isFlag(opt, declarativeFlag):
case isFlag(opt, skipTopoFlag):
case isFlag(opt, singletonFlag):
case isFlag(opt, singletonContextFlag):
default:
validOpts = append(validOpts, opt)
}
Expand All @@ -142,7 +149,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
// IsSkipTopo suggests that DDL should apply to tables bypassing global topo request
func (setting *DDLStrategySetting) IsSkipTopo() bool {
switch {
case setting.IsSingleton():
case setting.IsSingleton(), setting.IsSingletonContext():
return true
case setting.hasFlag(skipTopoFlag):
return true
Expand Down
4 changes: 3 additions & 1 deletion go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResu
for _, onlineDDL := range onlineDDLs {
if exec.ddlStrategySetting.IsSkipTopo() {
exec.executeOnAllTablets(ctx, execResult, onlineDDL.SQL, true)
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
if len(execResult.SuccessShards) > 0 {
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
}
} else {
exec.executeOnlineDDL(ctx, execResult, onlineDDL)
}
Expand Down
53 changes: 37 additions & 16 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,16 +1167,17 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
return nil, nil, ErrMigrationNotFound
}
onlineDDL = &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
RequestContext: row["migration_context"].ToString(),
}
return onlineDDL, row, nil
}
Expand Down Expand Up @@ -2518,7 +2519,7 @@ func (e *Executor) SubmitMigration(

onlineDDL, err := schema.OnlineDDLFromCommentedStatement(stmt)
if err != nil {
return nil, fmt.Errorf("Error submitting migration %s: %v", sqlparser.String(stmt), err)
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Error submitting migration %s: %v", sqlparser.String(stmt), err)
}
_, actionStr, err := onlineDDL.GetActionStr()
if err != nil {
Expand All @@ -2543,16 +2544,36 @@ func (e *Executor) SubmitMigration(
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() {
if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() || onlineDDL.StrategySetting().IsSingletonContext() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

uuids, err := e.readPendingMigrationsUUIDs(ctx)
pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return result, err
return nil, err
}
if len(uuids) > 0 {
return result, fmt.Errorf("singleton migration rejected: found pending migrations [%s]", strings.Join(uuids, ", "))
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return result, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return nil, err
}
if pendingOnlineDDL.RequestContext != onlineDDL.RequestContext {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.RequestContext)
}
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ const (
retries,
ddl_action,
artifacts,
tablet
tablet,
migration_context
FROM _vt.schema_migrations
WHERE
migration_uuid=%a
Expand All @@ -273,7 +274,8 @@ const (
retries,
ddl_action,
artifacts,
tablet
tablet,
migration_context
FROM _vt.schema_migrations
WHERE
migration_status='ready'
Expand Down