Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,61 @@ func testScheduler(t *testing.T) {
})
})

t.Run("Postpone completion ALTER with shards", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})

t.Run("wait for ready_to_complete", func(t *testing.T) {
waitForReadyToComplete(t, t1uuid, true)
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
assert.True(t, row["shadow_analyzed_timestamp"].IsNull())
}
})

t.Run("check postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(1), postponeCompletion)
}
})
t.Run("complete with irrelevant shards", func(t *testing.T) {
onlineddl.CheckCompleteMigrationShards(t, &vtParams, shards, t1uuid, "x,y,z", false)
// Added an artificial sleep here just to ensure we're not missing a would-be completion./
time.Sleep(2 * time.Second)
// Migration should still be in running state
Copy link
Contributor

@shlomi-noach shlomi-noach Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an artificial sleep here just to ensire we're not missing a would-be completion

Suggested change
// Migration should still be in running state
time.Sleep(2 * time.Second)
// Migration should still be in running state

onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
// postpone_completion should still be set
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(1), postponeCompletion)
}
})
t.Run("complete with relevant shards", func(t *testing.T) {
onlineddl.CheckCompleteMigrationShards(t, &vtParams, shards, t1uuid, "x, y, 1", true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("check no postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(0), postponeCompletion)
}
})
})

t.Run("Delayed postpone completion ALTER", func(t *testing.T) {
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
Expand Down
16 changes: 16 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func CheckCompleteMigration(t *testing.T, vtParams *mysql.ConnParams, shards []c
}
}

// CheckCompleteMigrationShards attempts to complete a migration for specific shards, and expects success by counting affected rows
func CheckCompleteMigrationShards(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, completeShards string, expectCompletePossible bool) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a complete vitess_shards %a",
sqltypes.StringBindVariable(uuid),
sqltypes.StringBindVariable(completeShards),
)
require.NoError(t, err)
r := VtgateExecQuery(t, vtParams, query, "")

if expectCompletePossible {
assert.Equal(t, len(shards), int(r.RowsAffected))
} else {
assert.Equal(t, int(0), int(r.RowsAffected))
}
}

// CheckPostponeCompleteMigration attempts to postpone an existing migration, and expects success by counting affected rows
func CheckPostponeCompleteMigration(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectPotponePossible bool) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a postpone complete",
Expand Down
4 changes: 4 additions & 0 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2583,6 +2583,10 @@ var (
input: "alter vitess_migration launch all",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete vitess_shards '-40'",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' complete vitess_shards '-40,40-80'",
}, {
input: "alter vitess_migration complete all",
}, {
Expand Down
Loading
Loading