Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const (
staleMigrationMinutes = 10
progressPctStarted float64 = 0
progressPctFull float64 = 100.0
etaSecondsUnknown = -1
etaSecondsNow = 0
databasePoolSize = 3
cutOverThreshold = 3 * time.Second
)
Expand Down Expand Up @@ -406,7 +408,7 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
}
defer conn.Close()

_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown)
_, err = conn.ExecuteFetch(onlineDDL.SQL, 0, false)

if err != nil {
Expand All @@ -425,7 +427,7 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
if err != nil {
return false, err
}
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow)

return acceptableErrorCodeFound, nil
}
Expand Down Expand Up @@ -574,7 +576,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
}()

// Tables are now swapped! Migration is successful
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow)
return nil

// deferred function will re-enable writes now
Expand Down Expand Up @@ -648,7 +650,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem

atomic.StoreInt64(&e.vreplMigrationRunning, 1)
e.lastMigrationUUID = onlineDDL.UUID
if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted); err != nil {
if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown); err != nil {
return err
}

Expand Down Expand Up @@ -762,7 +764,7 @@ export ONLINE_DDL_PASSWORD
}
onHookContent := func(status schema.OnlineDDLStatus) string {
return fmt.Sprintf(`#!/bin/bash
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"'&eta='"$GH_OST_ETA_SECONDS"
`, *servenv.Port, onlineDDL.UUID, string(status))
}
if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning)); err != nil {
Expand Down Expand Up @@ -1411,8 +1413,8 @@ func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDD
return fmt.Errorf("cannot run migration %s reverting %s: found %d artifact tables, expected maximum 1", onlineDDL.UUID, revertMigration.UUID, len(artifactTables))
}
if len(artifactTables) == 0 {
// This indicates no table was actually created. this must have beena CREATE TABLE IF NOT EXISTS where the table already existed.
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull)
// This indicates no table was actually created. this must have been a CREATE TABLE IF NOT EXISTS where the table already existed.
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow)
}

for _, artifactTable := range artifactTables {
Expand Down Expand Up @@ -1443,7 +1445,7 @@ func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDD
}
if len(artifactTables) == 0 {
// Could happen on `DROP TABLE IF EXISTS` where the table did not exist...
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow)
}
for _, artifactTable := range artifactTables {
if err := e.updateArtifacts(ctx, onlineDDL.UUID, artifactTable); err != nil {
Expand Down Expand Up @@ -2204,26 +2206,33 @@ func (e *Executor) updateMySQLTable(ctx context.Context, uuid string, tableName
return err
}

func (e *Executor) updateETASeconds(ctx context.Context, uuid string, etaSeconds int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationETASeconds,
sqltypes.Int64BindVariable(etaSeconds),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error {
if progress <= 0 {
// progress starts at 0, and can only increase.
// A value of "0" either means "This is the actual current progress" or "No information"
// In both cases there's nothing to update
return nil
}
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress,
":migration_progress",
":migration_uuid",
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationProgress,
sqltypes.Float64BindVariable(progress),
sqltypes.StringBindVariable(uuid),
)
bindVars := map[string]*querypb.BindVariable{
"migration_progress": sqltypes.Float64BindVariable(progress),
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
_, err = e.execQuery(ctx, bound)
_, err = e.execQuery(ctx, query)
return err
}

Expand All @@ -2240,7 +2249,7 @@ func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result
}

// onSchemaMigrationStatus is called when a status is set/changed for a running migration
func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus, dryRun bool, progressPct float64) (err error) {
func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus, dryRun bool, progressPct float64, etaSeconds int64) (err error) {
if dryRun && status != schema.OnlineDDLStatusFailed {
// We don't consider dry-run reports unless there's a failure
return nil
Expand Down Expand Up @@ -2276,6 +2285,9 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, sta
if err = e.updateMigrationProgress(ctx, uuid, progressPct); err != nil {
return err
}
if err = e.updateETASeconds(ctx, uuid, etaSeconds); err != nil {
return err
}

if !dryRun {
switch status {
Expand All @@ -2288,15 +2300,19 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, sta
}

// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks.
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) {
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam, etaParam string) (err error) {
status := schema.OnlineDDLStatus(statusParam)
dryRun := (dryrunParam == "true")
var progressPct float64
if pct, err := strconv.ParseFloat(progressParam, 32); err == nil {
if pct, err := strconv.ParseFloat(progressParam, 64); err == nil {
progressPct = pct
}
var etaSeconds int64 = etaSecondsUnknown
if eta, err := strconv.ParseInt(etaParam, 10, 64); err == nil {
etaSeconds = eta
}

return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct)
return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds)
}

// VExec is called by a VExec invocation
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
alterSchemaMigrationsTableDDLAction = "ALTER TABLE _vt.schema_migrations add column ddl_action varchar(16) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableMessage = "ALTER TABLE _vt.schema_migrations add column message TEXT NOT NULL"
alterSchemaMigrationsTableTableCompleteIndex = "ALTER TABLE _vt.schema_migrations add KEY table_complete_idx (migration_status, keyspace(64), mysql_table(64), completed_timestamp)"
alterSchemaMigrationsTableETASeconds = "ALTER TABLE _vt.schema_migrations add column eta_seconds bigint NOT NULL DEFAULT -1"

sqlScheduleSingleMigration = `UPDATE _vt.schema_migrations
SET
Expand All @@ -81,6 +82,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationETASeconds = `UPDATE _vt.schema_migrations
SET eta_seconds=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStartedTimestamp = `UPDATE _vt.schema_migrations
SET started_timestamp=IFNULL(started_timestamp, NOW())
WHERE
Expand Down Expand Up @@ -313,4 +319,5 @@ var applyDDL = []string{
alterSchemaMigrationsTableDDLAction,
alterSchemaMigrationsTableMessage,
alterSchemaMigrationsTableTableCompleteIndex,
alterSchemaMigrationsTableETASeconds,
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,7 @@ func (tsv *TabletServer) registerMigrationStatusHandler() {
tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
query := r.URL.Query()
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress")); err != nil {
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress"), query.Get("eta")); err != nil {
http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError)
return
}
Expand Down
Binary file modified resources/bin/gh-ost
Binary file not shown.