Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions go/mysql/capabilities/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ var (
type FlavorCapability int

const (
NoneFlavorCapability FlavorCapability = iota // default placeholder
FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
TransactionalGtidExecutedFlavorCapability //
InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
InstantAddLastColumnFlavorCapability //
InstantAddDropVirtualColumnFlavorCapability //
InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
InstantChangeColumnDefaultFlavorCapability //
InstantExpandEnumCapability //
InstantChangeColumnVisibilityCapability //
MySQLUpgradeInServerFlavorCapability //
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
NoneFlavorCapability FlavorCapability = iota // default placeholder
FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
TransactionalGtidExecutedFlavorCapability //
InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
InstantAddLastColumnFlavorCapability //
InstantAddDropVirtualColumnFlavorCapability //
InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
InstantChangeColumnDefaultFlavorCapability //
InstantExpandEnumCapability //
InstantChangeColumnVisibilityCapability //
MySQLUpgradeInServerFlavorCapability //
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
PerformanceSchemaMetadataLocksTableCapability // supported in MySQL 8.0.2 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
)

type CapableOf func(capability FlavorCapability) (bool, error)
Expand Down Expand Up @@ -97,6 +98,8 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability
return atLeast(8, 0, 0)
case PerformanceSchemaDataLocksTableCapability:
return atLeast(8, 0, 1)
case PerformanceSchemaMetadataLocksTableCapability:
return atLeast(8, 0, 2)
case MySQLUpgradeInServerFlavorCapability:
return atLeast(8, 0, 16)
case CheckConstraintsCapability:
Expand Down
19 changes: 19 additions & 0 deletions go/mysql/capabilities/capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,25 @@ func TestMySQLVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: true,
}, {
version: "5.7.38",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.1",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.2",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: true,
},
{
version: "8.0.29",
Expand Down
9 changes: 9 additions & 0 deletions go/mysql/flavor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ func TestServerVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: capabilities.PerformanceSchemaDataLocksTableCapability,
isCapable: true,
}, {
version: "5.7.38",
capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.20",
capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
isCapable: true,
},
{
// Some ridiculous version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,89 @@ func testScheduler(t *testing.T) {
}
})
})
t.Run("force_cutover mdl", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5)
defer cancel()

t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
t.Run("wait for t1 ready to complete", func(t *testing.T) {
// Waiting for 'running', above, is not enough. We want to let vreplication a chance to start running, or else
// we attempt the cut-over too early. Specifically in this test, we're going to lock rows FOR UPDATE, which,
// if vreplication does not get the chance to start, will prevent it from doing anything at all.
// ready_to_complete is a great signal for us that vreplication is healthy and up to date.
waitForReadyToComplete(t, t1uuid, true)
})

conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true)
require.NoError(t, err)
defer conn.Close()

unlockTables := func() error {
_, err := conn.ExecuteFetch("unlock tables", 0, false)
return err
}
t.Run("locking table", func(t *testing.T) {
_, err := conn.ExecuteFetch("lock tables t1_test write", 0, false)
require.NoError(t, err)
})
defer unlockTables()
t.Run("injecting heartbeats asynchronously", func(t *testing.T) {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil)
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}()
})
t.Run("check no force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(0), forceCutOver) // disabled
}
})
t.Run("attempt to complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
})
t.Run("cut-over fail due to timeout", func(t *testing.T) {
waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded")
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
})
t.Run("force_cutover", func(t *testing.T) {
onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true)
})
t.Run("check force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(1), forceCutOver) // enabled
}
})
t.Run("expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("expect unlock failure", func(t *testing.T) {
err := unlockTables()
assert.ErrorContains(t, err, "broken pipe")
})
})
}
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
Expand Down
55 changes: 31 additions & 24 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,34 +838,41 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
}
capableOf := mysql.ServerVersionCapableOf(conn.ServerVersion)
capable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability)
if err != nil {
return err
}
if capable {
{
// Kill connections that have open transactions locking the table. These potentially (probably?) are not
// actively running a query on our table. They're doing other things while holding locks on our table.
query, err := sqlparser.ParseAndBind(sqlProcessWithLocksOnTable, sqltypes.StringBindVariable(tableName))
if err != nil {
return err
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
terminateTransactions := func(capability capabilities.FlavorCapability, query string, column string, description string) error {
capable, err := capableOf(capability)
if err != nil {
return err
}
if !capable {
return nil
}
query, err = sqlparser.ParseAndBind(query, sqltypes.StringBindVariable(tableName))
if err != nil {
return err
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return vterrors.Wrapf(err, "finding transactions locking table `%s` %s", tableName, description)
}
log.Infof("terminateTransactions: found %v transactions locking table `%s` %s", len(rs.Rows), tableName, description)
for _, row := range rs.Named().Rows {
threadId := row.AsInt64(column, 0)
log.Infof("terminateTransactions: killing connection %v with transaction locking table `%s` %s", threadId, tableName, description)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
return vterrors.Wrapf(err, "finding transactions locking table")
}
log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows))
for _, row := range rs.Named().Rows {
threadId := row.AsInt64("trx_mysql_thread_id", 0)
log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
log.Errorf("Unable to kill the connection %d: %v", threadId, err)
}
log.Errorf("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v", threadId, tableName, description, err)
}
}
return nil
}
if err := terminateTransactions(capabilities.PerformanceSchemaDataLocksTableCapability, sqlProcessWithLocksOnTable, "trx_mysql_thread_id", "data"); err != nil {
return err
}
if err := terminateTransactions(capabilities.PerformanceSchemaMetadataLocksTableCapability, sqlProcessWithMetadataLocksOnTable, "processlist_id", "metadata"); err != nil {
return err
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,15 @@ const (
where
data_locks.OBJECT_SCHEMA=database() AND data_locks.OBJECT_NAME=%a
`
sqlProcessWithMetadataLocksOnTable = `
SELECT
DISTINCT threads.processlist_id
from
performance_schema.metadata_locks
join performance_schema.threads on (metadata_locks.OWNER_THREAD_ID=threads.THREAD_ID)
where
metadata_locks.OBJECT_SCHEMA=database() AND metadata_locks.OBJECT_NAME=%a
`
)

var (
Expand Down