diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index c1343531c35..6d39b767775 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -41,6 +41,7 @@ import ( "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -75,6 +76,10 @@ var ( ErrMigrationNotFound = errors.New("migration not found") ) +var ( + staleMigrationMinutesStats = stats.NewGauge("OnlineDDLStaleMigrationMinutes", "longest stale migration in minutes") +) + var ( // fixCompletedTimestampDone fixes a nil `completed_timestamp` columns, see // https://github.com/vitessio/vitess/issues/13927 @@ -115,7 +120,8 @@ func registerOnlineDDLFlags(fs *pflag.FlagSet) { const ( maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters - staleMigrationMinutes = 180 + staleMigrationFailMinutes = 180 + staleMigrationWarningMinutes = 5 progressPctStarted float64 = 0 progressPctFull float64 = 100.0 etaSecondsUnknown = -1 @@ -3121,7 +3127,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if _, ok := e.vreplicationLastError[uuid]; !ok { e.vreplicationLastError[uuid] = vterrors.NewLastError( fmt.Sprintf("Online DDL migration %v", uuid), - staleMigrationMinutes*time.Minute, + staleMigrationFailMinutes*time.Minute, ) } lastError := e.vreplicationLastError[uuid] @@ -3251,6 +3257,45 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i return countRunnning, cancellable, nil } +// monitorStaleMigrations checks for stale migrations, i.e. migrations that are in 'running' state +// but have not updated their liveness timestamp in past X minutes. It updates the stats +// staleMigrationMinutesStats with the maximum number of stale minutes found, and logs a warning +// for each stale migration found. +func (e *Executor) monitorStaleMigrations(ctx context.Context) error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + var maxStaleMinutes int64 + + query, err := sqlparser.ParseAndBind(sqlSelectStaleMigrations, + sqltypes.Int64BindVariable(staleMigrationWarningMinutes), + ) + if err != nil { + return err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + staleMinutes := row.AsInt64("stale_minutes", 0) + + onlineDDL, row, err := e.readMigration(ctx, uuid) + if err != nil { + return err + } + livenessTimestamp := row.AsString("liveness_timestamp", "") + message := fmt.Sprintf("stale migration %s: found running but indicates no liveness for %v minutes, since %v", onlineDDL.UUID, staleMinutes, livenessTimestamp) + log.Warning("warnStaleMigrations: %s", message) + + maxStaleMinutes = max(maxStaleMinutes, staleMinutes) + } + staleMigrationMinutesStats.Set(maxStaleMinutes) + + return nil +} + // reviewStaleMigrations marks as 'failed' migrations whose status is 'running' but which have // shown no liveness in past X minutes. It also attempts to terminate them func (e *Executor) reviewStaleMigrations(ctx context.Context) error { @@ -3258,7 +3303,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error { defer e.migrationMutex.Unlock() query, err := sqlparser.ParseAndBind(sqlSelectStaleMigrations, - sqltypes.Int64BindVariable(staleMigrationMinutes), + sqltypes.Int64BindVariable(staleMigrationFailMinutes), ) if err != nil { return err @@ -3275,7 +3320,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error { return err } log.Infof("reviewStaleMigrations: stale migration found: %s", onlineDDL.UUID) - message := fmt.Sprintf("stale migration %s: found running but indicates no liveness in the past %v minutes", onlineDDL.UUID, staleMigrationMinutes) + message := fmt.Sprintf("stale migration %s: found running but indicates no liveness in the past %v minutes", onlineDDL.UUID, staleMigrationFailMinutes) if onlineDDL.TabletAlias != e.TabletAliasString() { // This means another tablet started the migration, and the migration has failed due to the tablet failure (e.g. primary failover) if err := e.updateTabletFailure(ctx, onlineDDL.UUID); err != nil { @@ -3510,6 +3555,9 @@ func (e *Executor) onMigrationCheckTick() { } else if err := e.cancelMigrations(ctx, cancellable, false); err != nil { log.Error(err) } + if err := e.monitorStaleMigrations(ctx); err != nil { + log.Error(err) + } if err := e.reviewStaleMigrations(ctx); err != nil { log.Error(err) } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 7fb42c79c8b..0c2f6c3cb3c 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -357,7 +357,8 @@ const ( LIMIT 1 ` sqlSelectStaleMigrations = `SELECT - migration_uuid + migration_uuid, + timestampdiff(minute, liveness_timestamp, now()) as stale_minutes FROM _vt.schema_migrations WHERE migration_status='running'