Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: ready_to_complete race fix #12612

Merged
merged 4 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,17 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
})
t.Run("check ready to complete (before)", func(t *testing.T) {
for _, uuid := range []string{t1uuid, t2uuid} {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
readyToComplete := row.AsInt64("ready_to_complete", 0)
assert.Equal(t, int64(0), readyToComplete)
assert.True(t, row["ready_to_complete_timestamp"].IsNull())
}
}
})
t.Run("unthrottle, expect t2 running", func(t *testing.T) {
onlineddl.UnthrottleAllMigrations(t, &vtParams)
// t1 should now be ready_to_complete, hence t2 should start running
Expand Down Expand Up @@ -580,6 +591,18 @@ func testScheduler(t *testing.T) {
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("check ready to complete (after)", func(t *testing.T) {
for _, uuid := range []string{t1uuid, t2uuid} {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
readyToComplete := row.AsInt64("ready_to_complete", 0)
assert.Equal(t, int64(1), readyToComplete)
assert.False(t, row["ready_to_complete_timestamp"].IsNull())
}
}
})

testTableCompletionTimes(t, t2uuid, t1uuid)
})
t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) {
Expand Down
34 changes: 14 additions & 20 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"regexp"
"strconv"
"strings"
"time"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -84,19 +83,20 @@ const (

// OnlineDDL encapsulates the relevant information in an online schema change request
type OnlineDDL struct {
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Copy link
Contributor Author

@shlomi-noach shlomi-noach Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found that this field(RequestTime) was unused.

MigrationContext string `json:"context,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
ReadyToComplete int64 `json:"ready_to_complete,omitempty"`
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
// Stateful fields:
MigrationContext string `json:"context,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
ReadyToComplete int64 `json:"ready_to_complete,omitempty"`
WasReadyToComplete int64 `json:"was_ready_to_complete,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was going to ask if we can't use atomic.Int64 here (and for others where appropriate), but then found golang/go#54582.

}

// FromJSON creates an OnlineDDL from json
Expand Down Expand Up @@ -249,7 +249,6 @@ func NewOnlineDDL(keyspace string, table string, sql string, ddlStrategySetting
UUID: onlineDDLUUID,
Strategy: ddlStrategySetting.Strategy,
Options: ddlStrategySetting.Options,
RequestTime: time.Now().UnixNano(),
MigrationContext: migrationContext,
Status: OnlineDDLStatusRequested,
}, nil
Expand Down Expand Up @@ -328,11 +327,6 @@ func (onlineDDL *OnlineDDL) StrategySetting() *DDLStrategySetting {
return NewDDLStrategySetting(onlineDDL.Strategy, onlineDDL.Options)
}

// RequestTimeSeconds converts request time to seconds (losing nano precision)
func (onlineDDL *OnlineDDL) RequestTimeSeconds() int64 {
return onlineDDL.RequestTime / int64(time.Second)
}

// ToJSON exports this onlineDDL to JSON
func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) {
return json.Marshal(onlineDDL)
Expand Down
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/onlineddl/schema_migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS _vt.schema_migrations
`cutover_attempts` int unsigned NOT NULL DEFAULT '0',
`is_immediate_operation` tinyint unsigned NOT NULL DEFAULT '0',
`reviewed_timestamp` timestamp NULL DEFAULT NULL,
`ready_to_complete_timestamp` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`migration_uuid`),
KEY `keyspace_shard_idx` (`keyspace`(64), `shard`(64)),
Expand Down
116 changes: 43 additions & 73 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ func (e *Executor) proposedMigrationConflictsWithRunningMigration(runningMigrati
// Specifically, if the running migration is an ALTER, and is still busy with copying rows (copy_state), then
// we consider the two to be conflicting. But, if the running migration is done copying rows, and is now only
// applying binary logs, and is up-to-date, then we consider a new ALTER migration to be non-conflicting.
return atomic.LoadInt64(&runningMigration.ReadyToComplete) == 0
if atomic.LoadInt64(&runningMigration.WasReadyToComplete) == 0 {
return true
}
}
return false
}
Expand Down Expand Up @@ -1321,10 +1323,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
// make sure there's no vreplication workflow running under same name
_ = e.terminateVReplMigration(ctx, onlineDDL.UUID)

if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this double-verification, since we now run synchronously and under same mutex protection.

if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return ErrExecutorNotWritableTablet
}
Expand Down Expand Up @@ -1428,10 +1426,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
// Validation included testing the backend MySQL server and the gh-ost binary itself
// Execution runs first a dry run, then an actual migration
func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table)
}

if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return ErrExecutorNotWritableTablet
}
Expand Down Expand Up @@ -1646,10 +1640,6 @@ exit $exit_code
// Validation included testing the backend MySQL server and the pt-online-schema-change binary itself
// Execution runs first a dry run, then an actual migration
func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
if conflictFound, conflictingMigration := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
return vterrors.Wrapf(ErrExecutorMigrationAlreadyRunning, "conflicting migration: %v over table: %v", conflictingMigration.UUID, conflictingMigration.Table)
}

if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return ErrExecutorNotWritableTablet
}
Expand Down Expand Up @@ -1867,15 +1857,13 @@ export MYSQL_PWD

func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, row sqltypes.RowNamedValues, err error) {

parsed := sqlparser.BuildParsedQuery(sqlSelectMigration, ":migration_uuid")
bindVars := map[string]*querypb.BindVariable{
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
query, err := sqlparser.ParseAndBind(sqlSelectMigration,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return onlineDDL, nil, err
}
r, err := e.execQuery(ctx, bound)
r, err := e.execQuery(ctx, query)
if err != nil {
return onlineDDL, nil, err
}
Expand All @@ -1885,18 +1873,19 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
return nil, nil, ErrMigrationNotFound
}
onlineDDL = &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
ReadyToComplete: row.AsInt64("ready_to_complete", 0),
TabletAlias: row["tablet"].ToString(),
MigrationContext: row["migration_context"].ToString(),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
ReadyToComplete: row.AsInt64("ready_to_complete", 0),
WasReadyToComplete: row.AsInt64("was_ready_to_complete", 0),
TabletAlias: row["tablet"].ToString(),
MigrationContext: row["migration_context"].ToString(),
}
return onlineDDL, row, nil
}
Expand Down Expand Up @@ -2981,41 +2970,21 @@ func (e *Executor) executeAlterDDLActionMigration(ctx context.Context, onlineDDL
// OK, nothing special about this ALTER. Let's go ahead and execute it.
switch onlineDDL.Strategy {
case schema.DDLStrategyOnline, schema.DDLStrategyVitess:
go func() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil {
failMigration(err)
}
}()
if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil {
return failMigration(err)
}
case schema.DDLStrategyGhost:
go func() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil {
failMigration(err)
}
}()
if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil {
return failMigration(err)
}
case schema.DDLStrategyPTOSC:
go func() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil {
failMigration(err)
}
}()
if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil {
return failMigration(err)
}
case schema.DDLStrategyMySQL:
go func() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
failMigration(err)
}
}()
if _, err := e.executeDirectly(ctx, onlineDDL); err != nil {
return failMigration(err)
}
default:
{
return failMigration(fmt.Errorf("Unsupported strategy: %+v", onlineDDL.Strategy))
Expand Down Expand Up @@ -3160,14 +3129,9 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
case sqlparser.AlterDDLAction:
return e.executeAlterDDLActionMigration(ctx, onlineDDL)
case sqlparser.RevertDDLAction:
go func() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

if err := e.executeRevert(ctx, onlineDDL); err != nil {
failMigration(err)
}
}()
if err := e.executeRevert(ctx, onlineDDL); err != nil {
failMigration(err)
}
}
return nil
}
Expand Down Expand Up @@ -4279,8 +4243,13 @@ func (e *Executor) updateMigrationSetImmediateOperation(ctx context.Context, uui
}

func (e *Executor) updateMigrationReadyToComplete(ctx context.Context, uuid string, isReady bool) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationReadyToComplete,
sqltypes.BoolBindVariable(isReady),
var queryTemplate string
if isReady {
queryTemplate = sqlSetMigrationReadyToComplete
} else {
queryTemplate = sqlClearMigrationReadyToComplete
}
query, err := sqlparser.ParseAndBind(queryTemplate,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
Expand All @@ -4294,6 +4263,7 @@ func (e *Executor) updateMigrationReadyToComplete(ctx context.Context, uuid stri
var storeValue int64
if isReady {
storeValue = 1
atomic.StoreInt64(&runningMigration.WasReadyToComplete, 1) // WasReadyToComplete is set once and never cleared
}
atomic.StoreInt64(&runningMigration.ReadyToComplete, storeValue)
}
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,14 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationReadyToComplete = `UPDATE _vt.schema_migrations
SET ready_to_complete=%a
sqlSetMigrationReadyToComplete = `UPDATE _vt.schema_migrations SET
ready_to_complete=1,
ready_to_complete_timestamp=NOW(6)
WHERE
migration_uuid=%a
`
sqlClearMigrationReadyToComplete = `UPDATE _vt.schema_migrations SET
ready_to_complete=0
WHERE
migration_uuid=%a
`
Expand Down Expand Up @@ -381,6 +387,7 @@ const (
retain_artifacts_seconds,
is_view,
ready_to_complete,
ready_to_complete_timestamp is not null as was_ready_to_complete,
reverted_uuid,
rows_copied,
vitess_liveness_indicator,
Expand Down