Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("failed migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestSchemaChange(t *testing.T) {
// let's cancel it
onlineddl.CheckCancelMigration(t, &vtParams, shards, drop1uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("complete t1", func(t *testing.T) {
// t1 should be still running!
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestSchemaChange(t *testing.T) {
// let's cancel it
onlineddl.CheckCancelMigration(t, &vtParams, shards, drop1uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("complete t1", func(t *testing.T) {
// t1 should be still running!
Expand All @@ -537,7 +537,7 @@ func TestSchemaChange(t *testing.T) {
// let's cancel it
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})

// now, we submit the exact same migratoin again: same UUID, same migration context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusRunning)
onlineddl.CheckCancelMigration(t, &vtParams, shards, throttledUUID, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusCancelled)
})
t.Run("successful gh-ost alter, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost --singleton", "vtctl", "", "hint_col", "", false)
Expand Down Expand Up @@ -294,9 +294,9 @@ func TestSchemaChange(t *testing.T) {
// revert is running
_ = testOnlineDDLStatement(t, dropNonexistentTableStatement, "vitess --allow-concurrent --singleton-context", "vtctl", "migrate:ctx", "", "rejected", true)
onlineddl.CheckCancelMigration(t, &vtParams, shards, revertUUID, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusCancelled)
})
t.Run("success concurrent singleton-context with no-context revert", func(t *testing.T) {
revertUUID := testRevertMigration(t, uuids[len(uuids)-1], "vtctl", "vitess --allow-concurrent --postpone-completion", "rev:ctx", "", false)
Expand All @@ -306,9 +306,9 @@ func TestSchemaChange(t *testing.T) {
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, revertUUID, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusCancelled)
})
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestSchemaChange(t *testing.T) {
testRows(t)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusCancelled)
})

t.Run("throttled and unthrottled migration", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func testSingle(t *testing.T, testName string) {

if expectedErrorMessage, exists := readTestFile(t, testName, "expect_failure"); exists {
// Failure is expected!
assert.Equal(t, string(schema.OnlineDDLStatusFailed), migrationStatus)
assert.Contains(t, []string{string(schema.OnlineDDLStatusFailed), string(schema.OnlineDDLStatusCancelled)}, migrationStatus)
require.Contains(t, migrationMessage, expectedErrorMessage, "expected error message (%s) to contain (%s)", migrationMessage, expectedErrorMessage)
// no need to proceed to checksum or anything further
return
Expand Down Expand Up @@ -319,7 +319,7 @@ func waitForMigration(t *testing.T, uuid string, timeout time.Duration) sqltypes
row := readMigration(t, uuid)
status = row["migration_status"].ToString()
switch status {
case string(schema.OnlineDDLStatusComplete), string(schema.OnlineDDLStatusFailed):
case string(schema.OnlineDDLStatusComplete), string(schema.OnlineDDLStatusFailed), string(schema.OnlineDDLStatusCancelled):
// migration is complete, either successful or not
return row
}
Expand Down
41 changes: 26 additions & 15 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,9 +1420,8 @@ exit $exit_code

log.Infof("Will now dry-run gh-ost on: %s:%d", variables.host, variables.port)
if err := runGhost(false); err != nil {
// perhaps gh-ost was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
// perhaps gh-ost was interrupted midway and didn't have the chance to send a "failed" status
_ = e.failMigration(ctx, onlineDDL, err)

log.Errorf("Error executing gh-ost dry run: %+v", err)
return err
Expand All @@ -1433,8 +1432,7 @@ exit $exit_code
startedMigrations.Add(1)
if err := runGhost(true); err != nil {
// perhaps gh-ost was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
_ = e.failMigration(ctx, onlineDDL, err)
failedMigrations.Add(1)
log.Errorf("Error running gh-ost: %+v", err)
return err
Expand Down Expand Up @@ -1644,8 +1642,7 @@ export MYSQL_PWD
log.Infof("Will now dry-run pt-online-schema-change on: %s:%d", variables.host, variables.port)
if err := runPTOSC(false); err != nil {
// perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
_ = e.failMigration(ctx, onlineDDL, err)
_ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID)
log.Errorf("Error executing pt-online-schema-change dry run: %+v", err)
return err
Expand All @@ -1656,8 +1653,7 @@ export MYSQL_PWD
startedMigrations.Add(1)
if err := runPTOSC(true); err != nil {
// perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
_ = e.failMigration(ctx, onlineDDL, err)
_ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID)
_ = e.dropPTOSCMigrationTriggers(ctx, onlineDDL)
failedMigrations.Add(1)
Expand Down Expand Up @@ -1738,7 +1734,6 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl
if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil {
return foundRunning, fmt.Errorf("Error terminating migration, vreplication exec error: %+v", err)
}
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
case schema.DDLStrategyPTOSC:
// see if pt-osc is running (could have been executed by this vttablet or one that crashed in the past)
if running, pid, _ := e.isPTOSCMigrationRunning(ctx, onlineDDL.UUID); running {
Expand Down Expand Up @@ -1794,18 +1789,22 @@ func (e *Executor) CancelMigration(ctx context.Context, uuid string, message str
case schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed:
log.Infof("CancelMigration: migration %s is in non-cancellable status: %v", uuid, onlineDDL.Status)
return emptyResult, nil
}
// From this point on, we're actually cancelling a migration
_ = e.updateMigrationTimestamp(ctx, "cancelled_timestamp", uuid)
defer e.failMigration(ctx, onlineDDL, errors.New(message))
defer e.triggerNextCheckInterval()

switch onlineDDL.Status {
case schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady:
log.Infof("CancelMigration: cancelling %s with status: %v", uuid, onlineDDL.Status)
if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusCancelled); err != nil {
return nil, err
}
rowsAffected = 1
return &sqltypes.Result{RowsAffected: 1}, nil
}
defer e.triggerNextCheckInterval()

migrationFound, err := e.terminateMigration(ctx, onlineDDL)
defer e.updateMigrationMessage(ctx, onlineDDL.UUID, message)

if migrationFound {
log.Infof("CancelMigration: terminated %s with status: %v", uuid, onlineDDL.Status)
rowsAffected = 1
Expand Down Expand Up @@ -2341,7 +2340,7 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl

// failMigration marks a migration as failed
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, err error) error {
_ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed)
_ = e.updateMigrationStatusFailedOrCancelled(ctx, onlineDDL.UUID)
if err != nil {
_ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error())
}
Expand Down Expand Up @@ -3715,6 +3714,18 @@ func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error {
return err
}

func (e *Executor) updateMigrationStatusFailedOrCancelled(ctx context.Context, uuid string) error {
log.Infof("updateMigrationStatus: transitioning migration: %s into status failed or cancelled", uuid)
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationStatusFailedOrCancelled,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error {
log.Infof("updateMigrationStatus: transitioning migration: %s into status: %s", uuid, string(status))
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationStatus,
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
alterSchemaMigrationsTableSpecialPlan = "ALTER TABLE _vt.schema_migrations add column special_plan text NOT NULL"
alterSchemaMigrationsLastThrottled = "ALTER TABLE _vt.schema_migrations add column last_throttled_timestamp timestamp NULL DEFAULT NULL"
alterSchemaMigrationsComponentThrottled = "ALTER TABLE _vt.schema_migrations add column component_throttled tinytext NOT NULL"
alterSchemaMigrationsCancelledTimestamp = "ALTER TABLE _vt.schema_migrations add column cancelled_timestamp timestamp NULL DEFAULT NULL"

sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
Expand Down Expand Up @@ -122,6 +123,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStatusFailedOrCancelled = `UPDATE _vt.schema_migrations
SET migration_status=IF(cancelled_timestamp IS NULL, 'failed', 'cancelled')
WHERE
migration_uuid=%a
`
sqlUpdateMigrationProgress = `UPDATE _vt.schema_migrations
SET progress=%a
WHERE
Expand Down Expand Up @@ -272,6 +278,7 @@ const (
ready_timestamp=NULL,
started_timestamp=NULL,
liveness_timestamp=NULL,
cancelled_timestamp=NULL,
completed_timestamp=NULL,
cleanup_timestamp=NULL
WHERE
Expand All @@ -289,6 +296,7 @@ const (
ready_timestamp=NULL,
started_timestamp=NULL,
liveness_timestamp=NULL,
cancelled_timestamp=NULL,
completed_timestamp=NULL,
cleanup_timestamp=NULL
WHERE
Expand Down Expand Up @@ -410,6 +418,7 @@ const (
vitess_liveness_indicator,
user_throttle_ratio,
last_throttled_timestamp,
cancelled_timestamp,
component_throttled,
postpone_completion
FROM _vt.schema_migrations
Expand Down Expand Up @@ -627,4 +636,5 @@ var ApplyDDL = []string{
alterSchemaMigrationsTableSpecialPlan,
alterSchemaMigrationsLastThrottled,
alterSchemaMigrationsComponentThrottled,
alterSchemaMigrationsCancelledTimestamp,
}