Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -75,6 +76,10 @@ var (
ErrMigrationNotFound = errors.New("migration not found")
)

var (
staleMigrationMinutesStats = stats.NewGauge("OnlineDDLStaleMigrationMinutes", "longest stale migration in minutes")
)

var (
// fixCompletedTimestampDone fixes a nil `completed_timestamp` columns, see
// https://github.com/vitessio/vitess/issues/13927
Expand Down Expand Up @@ -115,7 +120,8 @@ func registerOnlineDDLFlags(fs *pflag.FlagSet) {

const (
maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters
staleMigrationMinutes = 180
staleMigrationFailMinutes = 180
staleMigrationWarningMinutes = 5
progressPctStarted float64 = 0
progressPctFull float64 = 100.0
etaSecondsUnknown = -1
Expand Down Expand Up @@ -3121,7 +3127,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if _, ok := e.vreplicationLastError[uuid]; !ok {
e.vreplicationLastError[uuid] = vterrors.NewLastError(
fmt.Sprintf("Online DDL migration %v", uuid),
staleMigrationMinutes*time.Minute,
staleMigrationFailMinutes*time.Minute,
)
}
lastError := e.vreplicationLastError[uuid]
Expand Down Expand Up @@ -3251,14 +3257,53 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, cancellable, nil
}

// monitorStaleMigrations checks for stale migrations, i.e. migrations that are in 'running' state
// but have not updated their liveness timestamp in past X minutes. It updates the stats
// staleMigrationMinutesStats with the maximum number of stale minutes found, and logs a warning
// for each stale migration found.
func (e *Executor) monitorStaleMigrations(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

var maxStaleMinutes int64

query, err := sqlparser.ParseAndBind(sqlSelectStaleMigrations,
sqltypes.Int64BindVariable(staleMigrationWarningMinutes),
)
if err != nil {
return err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
staleMinutes := row.AsInt64("stale_minutes", 0)

onlineDDL, row, err := e.readMigration(ctx, uuid)
if err != nil {
return err
}
livenessTimestamp := row.AsString("liveness_timestamp", "")
message := fmt.Sprintf("stale migration %s: found running but indicates no liveness for %v minutes, since %v", onlineDDL.UUID, staleMinutes, livenessTimestamp)
log.Warning("warnStaleMigrations: %s", message)

maxStaleMinutes = max(maxStaleMinutes, staleMinutes)
}
staleMigrationMinutesStats.Set(maxStaleMinutes)

return nil
}

// reviewStaleMigrations marks as 'failed' migrations whose status is 'running' but which have
// shown no liveness in past X minutes. It also attempts to terminate them
func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

query, err := sqlparser.ParseAndBind(sqlSelectStaleMigrations,
sqltypes.Int64BindVariable(staleMigrationMinutes),
sqltypes.Int64BindVariable(staleMigrationFailMinutes),
)
if err != nil {
return err
Expand All @@ -3275,7 +3320,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
return err
}
log.Infof("reviewStaleMigrations: stale migration found: %s", onlineDDL.UUID)
message := fmt.Sprintf("stale migration %s: found running but indicates no liveness in the past %v minutes", onlineDDL.UUID, staleMigrationMinutes)
message := fmt.Sprintf("stale migration %s: found running but indicates no liveness in the past %v minutes", onlineDDL.UUID, staleMigrationFailMinutes)
if onlineDDL.TabletAlias != e.TabletAliasString() {
// This means another tablet started the migration, and the migration has failed due to the tablet failure (e.g. primary failover)
if err := e.updateTabletFailure(ctx, onlineDDL.UUID); err != nil {
Expand Down Expand Up @@ -3510,6 +3555,9 @@ func (e *Executor) onMigrationCheckTick() {
} else if err := e.cancelMigrations(ctx, cancellable, false); err != nil {
log.Error(err)
}
if err := e.monitorStaleMigrations(ctx); err != nil {
log.Error(err)
}
if err := e.reviewStaleMigrations(ctx); err != nil {
log.Error(err)
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ const (
LIMIT 1
`
sqlSelectStaleMigrations = `SELECT
migration_uuid
migration_uuid,
timestampdiff(minute, liveness_timestamp, now()) as stale_minutes
FROM _vt.schema_migrations
WHERE
migration_status='running'
Expand Down
Loading