Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vitess Online DDL atomic cut-over #11460

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cb02fd6
cut-over with sentry table
shlomi-noach Oct 1, 2022
908631b
wait for rename via channel; write extra transaction post LOCK
shlomi-noach Oct 3, 2022
cc818db
add stage info
shlomi-noach Oct 11, 2022
288007a
reduced wait-for-pos timeout. Improved stage logic
shlomi-noach Oct 11, 2022
474c95f
Merge branch 'main' into vrepl-online-ddl-cut-over-atomic-with-sentry…
shlomi-noach Oct 11, 2022
b0c7862
cleanup
shlomi-noach Oct 11, 2022
c0bfe63
more cleanup
shlomi-noach Oct 11, 2022
6abb9c0
even more cleanup
shlomi-noach Oct 11, 2022
faad587
context timeout
shlomi-noach Oct 11, 2022
76dfcd5
preserve stage by disabling deferred stage changes
shlomi-noach Oct 11, 2022
e15fa21
killing rename query upon error
shlomi-noach Oct 11, 2022
795c27c
increase test timeout
shlomi-noach Oct 11, 2022
d3ef3b1
fix defer ordering
shlomi-noach Oct 11, 2022
d4aed59
log.info
shlomi-noach Oct 11, 2022
d2f4bdf
Merge branch 'main' into vrepl-online-ddl-cut-over-atomic-with-sentry…
shlomi-noach Oct 12, 2022
769507e
add and populate cutover_attempts column
shlomi-noach Oct 12, 2022
7bf25ff
Merge branch 'main' into vrepl-online-ddl-cut-over-atomic-with-sentry…
shlomi-noach Oct 13, 2022
aa2a538
Merge branch 'main' into vrepl-online-ddl-cut-over-atomic-with-sentry…
shlomi-noach Oct 18, 2022
9f8d31b
Merge branch 'main' into vrepl-online-ddl-cut-over-atomic-with-sentry…
shlomi-noach Oct 20, 2022
450eeb3
merge main, resolve conflict
shlomi-noach Nov 7, 2022
e53f030
search PROCESSLIST with LIKE
shlomi-noach Nov 8, 2022
a41a940
code comment
shlomi-noach Nov 8, 2022
123a376
making a variable more local
shlomi-noach Nov 8, 2022
ad548fe
literal string
shlomi-noach Nov 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
)

func resetOpOrder() {
Expand Down Expand Up @@ -344,7 +345,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
assert.NoError(t, err)

if !strategySetting.Strategy.IsDirect() {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 30*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
}

Expand Down
249 changes: 194 additions & 55 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,22 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
return acceptableErrorCodeFound, nil
}

// doesConnectionInfoMatch checks if theres a MySQL connection in PROCESSLIST whose Info matches given text
func (e *Executor) doesConnectionInfoMatch(ctx context.Context, connID int64, submatch string) (bool, error) {
findProcessQuery, err := sqlparser.ParseAndBind(sqlFindProcess,
sqltypes.Int64BindVariable(connID),
sqltypes.StringBindVariable("%"+submatch+"%"),
)
if err != nil {
return false, err
}
rs, err := e.execQuery(ctx, findProcessQuery)
if err != nil {
return false, err
}
return len(rs.Rows) == 1, nil
}

// validateTableForAlterAction checks whether a table is good to undergo a ALTER operation. It returns detailed error if not.
func (e *Executor) validateTableForAlterAction(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) {
// Validate table does not participate in foreign key relationship:
Expand Down Expand Up @@ -718,6 +734,10 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err

// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error {
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil {
return err
}

tmClient := e.tabletManagerClient()
defer tmClient.Close()

Expand All @@ -739,31 +759,101 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite()
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can fail for a variety of reasons, e.g. deadlock. We should have some error handling and perhaps retry logic. Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice-to-have but not strictly critical for the cut-over. It adds a level of auditing/logging. Not sure we should fail the cut-over over failure of this action.


var sentryTableName string

waitForPos := func(s *VReplStream, pos mysql.Position) error {
ctx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(pos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}

// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
var stowawayTableName string
if !isVreplicationTestSuite {
stowawayTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
if err != nil {
return nil
}

// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We
// don't want to overload the buffering time with this excessive wait.

if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil {
return err
}
// Audit stowawayTableName. If operation is complete, we remove the audit. But if this tablet fails while
// the original table is renamed (into stowaway table), then this will be both the evidence and the information we need
// to restore the table back into existence. This can (and will) be done by a different vttablet process
if err := e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, stowawayTableName); err != nil {
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName)
mattlord marked this conversation as resolved.
Show resolved Hide resolved

postSentryPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}
defer e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, "")
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", mysql.EncodePosition(postSentryPos))
mattlord marked this conversation as resolved.
Show resolved Hide resolved
if err := waitForPos(s, postSentryPos); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
}

lockConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer lockConn.Recycle()
defer lockConn.Exec(ctx, sqlUnlockTables, 1, false)

renameConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer renameConn.Recycle()
defer renameConn.Kill("premature exit while renaming tables", 0)
renameQuery := sqlparser.BuildParsedQuery(sqlSwapTables, onlineDDL.Table, sentryTableName, vreplTable, onlineDDL.Table, sentryTableName, vreplTable)

waitForRenameProcess := func() error {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
// This function waits until it finds the RENAME TABLE... query running in MySQL's PROCESSLIST, or until timeout
// The function assumes that one of the renamed tables is locked, thus causing the RENAME to block. If nothing
// is locked, then the RENAME will be near-instantaneious and it's unlikely that the function will find it.
renameWaitCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()

for {
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.ID(), "rename")
if err != nil {
return err
}
if renameProcessFound {
return nil
}
select {
case <-renameWaitCtx.Done():
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "timeout for rename query: %s", renameQuery.Query)
case <-time.After(time.Second):
// sleep
}
}
}

renameCompleteChan := make(chan error)

bufferingCtx, bufferingContextCancel := context.WithCancel(ctx)
defer bufferingContextCancel()
// Preparation is complete. We proceed to cut-over.
toggleBuffering := func(bufferQueries bool) error {
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries)
if !bufferQueries {
// called after new table is in place.
Expand All @@ -774,29 +864,32 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
return nil
}

var reenableOnce sync.Once
reenableWritesOnce := func() {
reenableOnce.Do(func() {
log.Infof("re-enabling writes in migration %v", onlineDDL.UUID)
toggleBuffering(false)
go log.Infof("cutOverVReplMigration %v: unbuffered queries", s.workflow)
})
}
go log.Infof("cutOverVReplMigration %v: buffering queries", s.workflow)
e.updateMigrationStage(ctx, onlineDDL.UUID, "buffering queries")
// stop writes on source:
err = toggleBuffering(true)
defer reenableWritesOnce()
if err != nil {
return err
}

// swap out the table
// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
// they will be able to complete before the rename, rather than block briefly on the rename only to find
// the table no longer exists.
e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering")
time.Sleep(100 * time.Millisecond)

if isVreplicationTestSuite {
// The testing suite may inject queries internally from the server via a recurring EVENT.
// Those queries are unaffected by query rules (ACLs) because they don't go through Vitess.
Expand All @@ -807,32 +900,41 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'before' table renamed")
} else {
// real production
go log.Infof("cutOverVReplMigration %v: renaming table %v to %v", s.workflow, onlineDDL.Table, stowawayTableName)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
parsed := sqlparser.BuildParsedQuery(sqlRenameTable, onlineDDL.Table, stowawayTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {

e.updateMigrationStage(ctx, onlineDDL.UUID, "locking tables")
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return err
}
}

// We have just created a gaping hole, the original table does not exist.
// we expect to fill that hole by swapping in the vrepl table. But if anything goes wrong we prepare
// to rename the table back:
defer func() {
if _, err := e.renameTableIfApplicable(ctx, stowawayTableName, onlineDDL.Table); err != nil {
vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "cannot rename back swapped table: %v into %v: %v", stowawayTableName, onlineDDL.Table, err)
e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
go func() {
_, err := renameConn.Exec(ctx, renameQuery.Query, 1, false)
renameCompleteChan <- err
}()
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return err
}
go log.Infof("cutOverVReplMigration %v: restored table %v back to %v", s.workflow, stowawayTableName, onlineDDL.Table)
}()
// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
postWritesPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}

// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", s.workflow)

// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
Expand All @@ -841,22 +943,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}

waitForPos := func() error {
ctx, cancel := context.WithTimeout(ctx, 2*vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(postWritesPos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}
go log.Infof("cutOverVReplMigration %v: waiting for position %v", s.workflow, mysql.EncodePosition(postWritesPos))
if err := waitForPos(); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", mysql.EncodePosition(postWritesPos))
if err := waitForPos(s, postWritesPos); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err)
return err
}
go log.Infof("cutOverVReplMigration %v: done waiting for position %v", s.workflow, mysql.EncodePosition(postWritesPos))
// Stop vreplication
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication")
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(uint32(s.id), "stopped for online DDL cutover")); err != nil {
return err
}
Expand All @@ -871,24 +965,43 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'after' table renamed")
} else {
// Normal (non-testing) alter table
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
if err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place")
if err := waitForRenameProcess(); err != nil {
return err
}
defer conn.Close()

parsed := sqlparser.BuildParsedQuery(sqlRenameTwoTables,
vreplTable, onlineDDL.Table,
stowawayTableName, vreplTable,
)
go log.Infof("cutOverVReplMigration %v: switch of tables %v, %v, %v", s.workflow, vreplTable, onlineDDL.Table, stowawayTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
// Normal (non-testing) alter table
e.updateMigrationStage(ctx, onlineDDL.UUID, "dropping sentry table")

{
dropTableQuery := sqlparser.BuildParsedQuery(sqlDropTable, sentryTableName)
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
if _, err := lockConn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables")
if _, err := lockConn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil {
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete")
if err := <-renameCompleteChan; err != nil {
return err
}
}
}
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "cut-over complete")
e.ownedRunningMigrations.Delete(onlineDDL.UUID)

go func() {
Expand All @@ -903,13 +1016,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
}()

// Tables are now swapped! Migration is successful
e.updateMigrationStage(ctx, onlineDDL.UUID, "re-enabling writes")
reenableWritesOnce() // this function is also deferred, in case of early return; but now would be a good time to resume writes, before we publish the migration as "complete"
go log.Infof("cutOverVReplMigration %v: marking as complete", s.workflow)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, s.rowsCopied, emptyHint)
return nil

// deferred function will re-enable writes now
// deferred function will unlock keyspace
}

// initMigrationSQLMode sets sql_mode according to DDL strategy, and returns a function that
Expand Down Expand Up @@ -3344,6 +3457,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if isReady {
if err := e.cutOverVReplMigration(ctx, s); err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
log.Errorf("cutOverVReplMigration failed: err=%v", err)
if merr, ok := err.(*mysql.SQLError); ok {
switch merr.Num {
case mysql.ERTooLongIdent:
Expand Down Expand Up @@ -3780,6 +3894,31 @@ func (e *Executor) updateMigrationSpecialPlan(ctx context.Context, uuid string,
return err
}

func (e *Executor) updateMigrationStage(ctx context.Context, uuid string, stage string, args ...interface{}) error {
msg := fmt.Sprintf(stage, args...)
log.Infof("updateMigrationStage: uuid=%s, stage=%s", uuid, msg)
query, err := sqlparser.ParseAndBind(sqlUpdateStage,
sqltypes.StringBindVariable(msg),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error that we're not handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to summarize, we're ignoring it where it doesn't have a true effect on the cut-over, as we consider it more of a logging/debuggability feature

return err
}

func (e *Executor) incrementCutoverAttempts(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlIncrementCutoverAttempts,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

// updateMigrationTablet sets 'tablet' column to be this executor's tablet alias for given migration
func (e *Executor) updateMigrationTablet(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlUpdateTablet,
Expand Down
Loading