diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 67294c3e878..bc8363748bc 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -111,6 +111,8 @@ const ( staleMigrationMinutes = 10 progressPctStarted float64 = 0 progressPctFull float64 = 100.0 + etaSecondsUnknown = -1 + etaSecondsNow = 0 databasePoolSize = 3 cutOverThreshold = 3 * time.Second ) @@ -406,7 +408,7 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online } defer conn.Close() - _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted) + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown) _, err = conn.ExecuteFetch(onlineDDL.SQL, 0, false) if err != nil { @@ -425,7 +427,7 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online if err != nil { return false, err } - _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow) return acceptableErrorCodeFound, nil } @@ -574,7 +576,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er }() // Tables are now swapped! Migration is successful - _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow) return nil // deferred function will re-enable writes now @@ -648,7 +650,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem atomic.StoreInt64(&e.vreplMigrationRunning, 1) e.lastMigrationUUID = onlineDDL.UUID - if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted); err != nil { + if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown); err != nil { return err } @@ -762,7 +764,7 @@ export ONLINE_DDL_PASSWORD } onHookContent := func(status schema.OnlineDDLStatus) string { return fmt.Sprintf(`#!/bin/bash -curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS" +curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"'&eta='"$GH_OST_ETA_SECONDS" `, *servenv.Port, onlineDDL.UUID, string(status)) } if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning)); err != nil { @@ -1411,8 +1413,8 @@ func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDD return fmt.Errorf("cannot run migration %s reverting %s: found %d artifact tables, expected maximum 1", onlineDDL.UUID, revertMigration.UUID, len(artifactTables)) } if len(artifactTables) == 0 { - // This indicates no table was actually created. this must have beena CREATE TABLE IF NOT EXISTS where the table already existed. - _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + // This indicates no table was actually created. this must have been a CREATE TABLE IF NOT EXISTS where the table already existed. + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow) } for _, artifactTable := range artifactTables { @@ -1443,7 +1445,7 @@ func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDD } if len(artifactTables) == 0 { // Could happen on `DROP TABLE IF EXISTS` where the table did not exist... - _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow) } for _, artifactTable := range artifactTables { if err := e.updateArtifacts(ctx, onlineDDL.UUID, artifactTable); err != nil { @@ -2204,6 +2206,18 @@ func (e *Executor) updateMySQLTable(ctx context.Context, uuid string, tableName return err } +func (e *Executor) updateETASeconds(ctx context.Context, uuid string, etaSeconds int64) error { + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationETASeconds, + sqltypes.Int64BindVariable(etaSeconds), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error { if progress <= 0 { // progress starts at 0, and can only increase. @@ -2211,19 +2225,14 @@ func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, pro // In both cases there's nothing to update return nil } - parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress, - ":migration_progress", - ":migration_uuid", + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationProgress, + sqltypes.Float64BindVariable(progress), + sqltypes.StringBindVariable(uuid), ) - bindVars := map[string]*querypb.BindVariable{ - "migration_progress": sqltypes.Float64BindVariable(progress), - "migration_uuid": sqltypes.StringBindVariable(uuid), - } - bound, err := parsed.GenerateQuery(bindVars, nil) if err != nil { return err } - _, err = e.execQuery(ctx, bound) + _, err = e.execQuery(ctx, query) return err } @@ -2240,7 +2249,7 @@ func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result } // onSchemaMigrationStatus is called when a status is set/changed for a running migration -func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus, dryRun bool, progressPct float64) (err error) { +func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus, dryRun bool, progressPct float64, etaSeconds int64) (err error) { if dryRun && status != schema.OnlineDDLStatusFailed { // We don't consider dry-run reports unless there's a failure return nil @@ -2276,6 +2285,9 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, sta if err = e.updateMigrationProgress(ctx, uuid, progressPct); err != nil { return err } + if err = e.updateETASeconds(ctx, uuid, etaSeconds); err != nil { + return err + } if !dryRun { switch status { @@ -2288,15 +2300,19 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, sta } // OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks. -func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) { +func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam, etaParam string) (err error) { status := schema.OnlineDDLStatus(statusParam) dryRun := (dryrunParam == "true") var progressPct float64 - if pct, err := strconv.ParseFloat(progressParam, 32); err == nil { + if pct, err := strconv.ParseFloat(progressParam, 64); err == nil { progressPct = pct } + var etaSeconds int64 = etaSecondsUnknown + if eta, err := strconv.ParseInt(etaParam, 10, 64); err == nil { + etaSeconds = eta + } - return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct) + return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds) } // VExec is called by a VExec invocation diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 9f80f678f0e..2b0228cc114 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -55,6 +55,7 @@ const ( alterSchemaMigrationsTableDDLAction = "ALTER TABLE _vt.schema_migrations add column ddl_action varchar(16) NOT NULL DEFAULT ''" alterSchemaMigrationsTableMessage = "ALTER TABLE _vt.schema_migrations add column message TEXT NOT NULL" alterSchemaMigrationsTableTableCompleteIndex = "ALTER TABLE _vt.schema_migrations add KEY table_complete_idx (migration_status, keyspace(64), mysql_table(64), completed_timestamp)" + alterSchemaMigrationsTableETASeconds = "ALTER TABLE _vt.schema_migrations add column eta_seconds bigint NOT NULL DEFAULT -1" sqlScheduleSingleMigration = `UPDATE _vt.schema_migrations SET @@ -81,6 +82,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateMigrationETASeconds = `UPDATE _vt.schema_migrations + SET eta_seconds=%a + WHERE + migration_uuid=%a + ` sqlUpdateMigrationStartedTimestamp = `UPDATE _vt.schema_migrations SET started_timestamp=IFNULL(started_timestamp, NOW()) WHERE @@ -313,4 +319,5 @@ var applyDDL = []string{ alterSchemaMigrationsTableDDLAction, alterSchemaMigrationsTableMessage, alterSchemaMigrationsTableTableCompleteIndex, + alterSchemaMigrationsTableETASeconds, } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 1e0a5509a14..6c94dd70c23 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1574,7 +1574,7 @@ func (tsv *TabletServer) registerMigrationStatusHandler() { tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() query := r.URL.Query() - if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress")); err != nil { + if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress"), query.Get("eta")); err != nil { http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError) return } diff --git a/resources/bin/gh-ost b/resources/bin/gh-ost index 2ec126a8cde..d54349be8da 100644 Binary files a/resources/bin/gh-ost and b/resources/bin/gh-ost differ