From 2905c85bfb3cd385859130689365b8e31ca94e54 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 12 Mar 2023 13:17:39 +0200 Subject: [PATCH 1/4] Online DDL: synchronous execution of migrations; fix race vitess submission condition; introduce ready_to_complete_timestamp and WasReadyToComplete Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 21 ++++ go/vt/schema/online_ddl.go | 34 +++-- .../schema/onlineddl/schema_migrations.sql | 1 + go/vt/vttablet/onlineddl/executor.go | 116 +++++++----------- go/vt/vttablet/onlineddl/schema.go | 11 +- 5 files changed, 88 insertions(+), 95 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 7bed08c71ca..5b41a4c069c 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -552,6 +552,27 @@ func testScheduler(t *testing.T) { // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + + { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + readyToComplete := row.AsInt64("ready_to_complete", 0) + assert.Equal(t, int64(1), readyToComplete) + wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) + assert.Equal(t, int64(1), wasReadyToComplete) + } + } + { + rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + readyToComplete := row.AsInt64("ready_to_complete", 0) + assert.Equal(t, int64(0), readyToComplete) + wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) + assert.Equal(t, int64(0), wasReadyToComplete) + } + } }) t.Run("unthrottle, expect t2 running", func(t *testing.T) { onlineddl.UnthrottleAllMigrations(t, &vtParams) diff --git a/go/vt/schema/online_ddl.go b/go/vt/schema/online_ddl.go index 7141d9ec71b..889caf811bc 100644 --- a/go/vt/schema/online_ddl.go +++ b/go/vt/schema/online_ddl.go @@ -24,7 +24,6 @@ import ( "regexp" "strconv" "strings" - "time" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" @@ -84,19 +83,20 @@ const ( // OnlineDDL encapsulates the relevant information in an online schema change request type OnlineDDL struct { - Keyspace string `json:"keyspace,omitempty"` - Table string `json:"table,omitempty"` - Schema string `json:"schema,omitempty"` - SQL string `json:"sql,omitempty"` - UUID string `json:"uuid,omitempty"` - Strategy DDLStrategy `json:"strategy,omitempty"` - Options string `json:"options,omitempty"` - RequestTime int64 `json:"time_created,omitempty"` - MigrationContext string `json:"context,omitempty"` - Status OnlineDDLStatus `json:"status,omitempty"` - TabletAlias string `json:"tablet,omitempty"` - Retries int64 `json:"retries,omitempty"` - ReadyToComplete int64 `json:"ready_to_complete,omitempty"` + Keyspace string `json:"keyspace,omitempty"` + Table string `json:"table,omitempty"` + Schema string `json:"schema,omitempty"` + SQL string `json:"sql,omitempty"` + UUID string `json:"uuid,omitempty"` + Strategy DDLStrategy `json:"strategy,omitempty"` + Options string `json:"options,omitempty"` + // Stateful fields: + MigrationContext string `json:"context,omitempty"` + Status OnlineDDLStatus `json:"status,omitempty"` + TabletAlias string `json:"tablet,omitempty"` + Retries int64 `json:"retries,omitempty"` + ReadyToComplete int64 `json:"ready_to_complete,omitempty"` + WasReadyToComplete int64 `json:"was_ready_to_complete,omitempty"` } // FromJSON creates an OnlineDDL from json @@ -249,7 +249,6 @@ func NewOnlineDDL(keyspace string, table string, sql string, ddlStrategySetting UUID: onlineDDLUUID, Strategy: ddlStrategySetting.Strategy, Options: ddlStrategySetting.Options, - RequestTime: time.Now().UnixNano(), MigrationContext: migrationContext, Status: OnlineDDLStatusRequested, }, nil @@ -328,11 +327,6 @@ func (onlineDDL *OnlineDDL) StrategySetting() *DDLStrategySetting { return NewDDLStrategySetting(onlineDDL.Strategy, onlineDDL.Options) } -// RequestTimeSeconds converts request time to seconds (losing nano precision) -func (onlineDDL *OnlineDDL) RequestTimeSeconds() int64 { - return onlineDDL.RequestTime / int64(time.Second) -} - // ToJSON exports this onlineDDL to JSON func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) { return json.Marshal(onlineDDL) diff --git a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql index 7e3f64ba185..42ada767fc9 100644 --- a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql +++ b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS _vt.schema_migrations `cutover_attempts` int unsigned NOT NULL DEFAULT '0', `is_immediate_operation` tinyint unsigned NOT NULL DEFAULT '0', `reviewed_timestamp` timestamp NULL DEFAULT NULL, + `ready_to_complete_timestamp` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`migration_uuid`), KEY `keyspace_shard_idx` (`keyspace`(64), `shard`(64)), diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index cc0614e97eb..a790f1f7efd 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -395,7 +395,9 @@ func (e *Executor) proposedMigrationConflictsWithRunningMigration(runningMigrati // Specifically, if the running migration is an ALTER, and is still busy with copying rows (copy_state), then // we consider the two to be conflicting. But, if the running migration is done copying rows, and is now only // applying binary logs, and is up-to-date, then we consider a new ALTER migration to be non-conflicting. - return atomic.LoadInt64(&runningMigration.ReadyToComplete) == 0 + if atomic.LoadInt64(&runningMigration.WasReadyToComplete) == 0 { + return true + } } return false } @@ -1321,10 +1323,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem // make sure there's no vreplication workflow running under same name _ = e.terminateVReplMigration(ctx, onlineDDL.UUID) - if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { - return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table) - } - if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { return ErrExecutorNotWritableTablet } @@ -1428,10 +1426,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem // Validation included testing the backend MySQL server and the gh-ost binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { - return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table) - } - if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { return ErrExecutorNotWritableTablet } @@ -1646,10 +1640,6 @@ exit $exit_code // Validation included testing the backend MySQL server and the pt-online-schema-change binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { - return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table) - } - if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { return ErrExecutorNotWritableTablet } @@ -1867,15 +1857,13 @@ export MYSQL_PWD func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, row sqltypes.RowNamedValues, err error) { - parsed := sqlparser.BuildParsedQuery(sqlSelectMigration, ":migration_uuid") - bindVars := map[string]*querypb.BindVariable{ - "migration_uuid": sqltypes.StringBindVariable(uuid), - } - bound, err := parsed.GenerateQuery(bindVars, nil) + query, err := sqlparser.ParseAndBind(sqlSelectMigration, + sqltypes.StringBindVariable(uuid), + ) if err != nil { return onlineDDL, nil, err } - r, err := e.execQuery(ctx, bound) + r, err := e.execQuery(ctx, query) if err != nil { return onlineDDL, nil, err } @@ -1885,18 +1873,19 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s return nil, nil, ErrMigrationNotFound } onlineDDL = &schema.OnlineDDL{ - Keyspace: row["keyspace"].ToString(), - Table: row["mysql_table"].ToString(), - Schema: row["mysql_schema"].ToString(), - SQL: row["migration_statement"].ToString(), - UUID: row["migration_uuid"].ToString(), - Strategy: schema.DDLStrategy(row["strategy"].ToString()), - Options: row["options"].ToString(), - Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), - Retries: row.AsInt64("retries", 0), - ReadyToComplete: row.AsInt64("ready_to_complete", 0), - TabletAlias: row["tablet"].ToString(), - MigrationContext: row["migration_context"].ToString(), + Keyspace: row["keyspace"].ToString(), + Table: row["mysql_table"].ToString(), + Schema: row["mysql_schema"].ToString(), + SQL: row["migration_statement"].ToString(), + UUID: row["migration_uuid"].ToString(), + Strategy: schema.DDLStrategy(row["strategy"].ToString()), + Options: row["options"].ToString(), + Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), + Retries: row.AsInt64("retries", 0), + ReadyToComplete: row.AsInt64("ready_to_complete", 0), + WasReadyToComplete: row.AsInt64("was_ready_to_complete", 0), + TabletAlias: row["tablet"].ToString(), + MigrationContext: row["migration_context"].ToString(), } return onlineDDL, row, nil } @@ -2981,41 +2970,21 @@ func (e *Executor) executeAlterDDLActionMigration(ctx context.Context, onlineDDL // OK, nothing special about this ALTER. Let's go ahead and execute it. switch onlineDDL.Strategy { case schema.DDLStrategyOnline, schema.DDLStrategyVitess: - go func() { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - - if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil { - failMigration(err) - } - }() + if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil { + return failMigration(err) + } case schema.DDLStrategyGhost: - go func() { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - - if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { - failMigration(err) - } - }() + if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { + return failMigration(err) + } case schema.DDLStrategyPTOSC: - go func() { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - - if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { - failMigration(err) - } - }() + if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { + return failMigration(err) + } case schema.DDLStrategyMySQL: - go func() { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - - if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { - failMigration(err) - } - }() + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + return failMigration(err) + } default: { return failMigration(fmt.Errorf("Unsupported strategy: %+v", onlineDDL.Strategy)) @@ -3160,14 +3129,9 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin case sqlparser.AlterDDLAction: return e.executeAlterDDLActionMigration(ctx, onlineDDL) case sqlparser.RevertDDLAction: - go func() { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - - if err := e.executeRevert(ctx, onlineDDL); err != nil { - failMigration(err) - } - }() + if err := e.executeRevert(ctx, onlineDDL); err != nil { + failMigration(err) + } } return nil } @@ -4279,8 +4243,13 @@ func (e *Executor) updateMigrationSetImmediateOperation(ctx context.Context, uui } func (e *Executor) updateMigrationReadyToComplete(ctx context.Context, uuid string, isReady bool) error { - query, err := sqlparser.ParseAndBind(sqlUpdateMigrationReadyToComplete, - sqltypes.BoolBindVariable(isReady), + var queryTemplate string + if isReady { + queryTemplate = sqlSetMigrationReadyToComplete + } else { + queryTemplate = sqlClearMigrationReadyToComplete + } + query, err := sqlparser.ParseAndBind(queryTemplate, sqltypes.StringBindVariable(uuid), ) if err != nil { @@ -4294,6 +4263,7 @@ func (e *Executor) updateMigrationReadyToComplete(ctx context.Context, uuid stri var storeValue int64 if isReady { storeValue = 1 + atomic.StoreInt64(&runningMigration.WasReadyToComplete, 1) // WasReadyToComplete is set once and never cleared } atomic.StoreInt64(&runningMigration.ReadyToComplete, storeValue) } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 51f5ca48e7e..90babe05b20 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -95,8 +95,14 @@ const ( WHERE migration_uuid=%a ` - sqlUpdateMigrationReadyToComplete = `UPDATE _vt.schema_migrations - SET ready_to_complete=%a + sqlSetMigrationReadyToComplete = `UPDATE _vt.schema_migrations SET + ready_to_complete=1, + ready_to_complete_timestamp=NOW(6) + WHERE + migration_uuid=%a + ` + sqlClearMigrationReadyToComplete = `UPDATE _vt.schema_migrations SET + ready_to_complete=0 WHERE migration_uuid=%a ` @@ -381,6 +387,7 @@ const ( retain_artifacts_seconds, is_view, ready_to_complete, + ready_to_complete_timestamp is not null as was_ready_to_complete, reverted_uuid, rows_copied, vitess_liveness_indicator, From 69f66ae94b6e4044b46b64bdc1766771a2470b2c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 12 Mar 2023 13:23:08 +0200 Subject: [PATCH 2/4] fix endtoend test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 5b41a4c069c..0da27ec6b13 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -552,19 +552,10 @@ func testScheduler(t *testing.T) { // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) - - { - rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) - require.NotNil(t, rs) - for _, row := range rs.Named().Rows { - readyToComplete := row.AsInt64("ready_to_complete", 0) - assert.Equal(t, int64(1), readyToComplete) - wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) - assert.Equal(t, int64(1), wasReadyToComplete) - } - } - { - rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid) + }) + t.Run("chech ready to complete (before)", func(t *testing.T) { + for _, uuid := range []string{t1uuid, t2uuid} { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) require.NotNil(t, rs) for _, row := range rs.Named().Rows { readyToComplete := row.AsInt64("ready_to_complete", 0) @@ -601,6 +592,19 @@ func testScheduler(t *testing.T) { fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) }) + t.Run("chech ready to complete (after)", func(t *testing.T) { + for _, uuid := range []string{t1uuid, t2uuid} { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + readyToComplete := row.AsInt64("ready_to_complete", 0) + assert.Equal(t, int64(1), readyToComplete) + wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) + assert.Equal(t, int64(1), wasReadyToComplete) + } + } + }) + testTableCompletionTimes(t, t2uuid, t1uuid) }) t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) { From 5a13156594e2024a8e2a1a7ffc65478abc946886 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 12 Mar 2023 13:56:48 +0200 Subject: [PATCH 3/4] test: check NULL Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/scheduler/onlineddl_scheduler_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 0da27ec6b13..77d7404f985 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -560,8 +560,7 @@ func testScheduler(t *testing.T) { for _, row := range rs.Named().Rows { readyToComplete := row.AsInt64("ready_to_complete", 0) assert.Equal(t, int64(0), readyToComplete) - wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) - assert.Equal(t, int64(0), wasReadyToComplete) + assert.True(t, row["ready_to_complete_timestamp"].IsNull()) } } }) @@ -599,8 +598,7 @@ func testScheduler(t *testing.T) { for _, row := range rs.Named().Rows { readyToComplete := row.AsInt64("ready_to_complete", 0) assert.Equal(t, int64(1), readyToComplete) - wasReadyToComplete := row.AsInt64("ready_to_complete_timestamp is not null", 0) - assert.Equal(t, int64(1), wasReadyToComplete) + assert.False(t, row["ready_to_complete_timestamp"].IsNull()) } } }) From fca4fa107b1931443f41f15d5a1cec5b8f9de3cc Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 12 Mar 2023 14:14:29 +0200 Subject: [PATCH 4/4] typo Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 77d7404f985..60413d4d1d8 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -553,7 +553,7 @@ func testScheduler(t *testing.T) { onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) }) - t.Run("chech ready to complete (before)", func(t *testing.T) { + t.Run("check ready to complete (before)", func(t *testing.T) { for _, uuid := range []string{t1uuid, t2uuid} { rs := onlineddl.ReadMigrations(t, &vtParams, uuid) require.NotNil(t, rs) @@ -591,7 +591,7 @@ func testScheduler(t *testing.T) { fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) }) - t.Run("chech ready to complete (after)", func(t *testing.T) { + t.Run("check ready to complete (after)", func(t *testing.T) { for _, uuid := range []string{t1uuid, t2uuid} { rs := onlineddl.ReadMigrations(t, &vtParams, uuid) require.NotNil(t, rs)