Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

an atomic cut-over implementation #84

Merged
merged 1 commit into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.9.7"
RELEASE_VERSION="0.9.8"

buildpath=/tmp/gh-ost
target=gh-ost
Expand Down
3 changes: 2 additions & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const (
type CutOver int

const (
CutOverSafe CutOver = iota
CutOverAtomic CutOver = iota
CutOverSafe = iota
CutOverTwoStep = iota
)

Expand Down
6 changes: 4 additions & 2 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (atomic, two-step, voluntary-lock)")

flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
Expand Down Expand Up @@ -144,7 +144,9 @@ func main() {
}

switch *cutOver {
case "safe", "default", "":
case "atomic", "default", "":
migrationContext.CutOverType = base.CutOverAtomic
case "safe":
migrationContext.CutOverType = base.CutOverSafe
case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep
Expand Down
209 changes: 204 additions & 5 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/outbrain/golib/sqlutils"
)

const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)

// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
Expand Down Expand Up @@ -77,15 +81,21 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
return nil
}

// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
// showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
rowMap = nil
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)

sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
tableFound = true
rowMap = m
return nil
})
return tableFound
return rowMap
}

// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
m := this.showTableStatus(tableName)
return (m != nil)
}

// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
Expand Down Expand Up @@ -775,6 +785,195 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
return nil
}

// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName)
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
}
log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName)
}

// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err
}
tableName := this.migrationContext.GetOldTableName()

query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id int auto_increment primary key
) comment='%s'
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
atomicCutOverMagicHint,
)
log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Magic cut-over table created")

return nil
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
sessionIdChan <- -1
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
}()

var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId

lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}

tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}

if err := this.CreateAtomicCutOverSentryTable(); err != nil {
tableLocked <- err
return err
}

query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
log.Infof("Tables locked")
tableLocked <- nil // No error.

// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)

// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable
log.Infof("Will now proceed to drop magic table and unlock tables")

// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
if _, err := tx.Exec(query); err != nil {
log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}

// Tables still locked
log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
return log.Errore(err)
}
log.Infof("Tables unlocked")
tableUnlocked <- nil
return nil
}

// RenameOriginalTable will attempt renaming the original table into _old
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId

log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}

query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
return log.Errore(err)
}
tablesRenamed <- nil
log.Infof("Tables renamed")
return nil
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
Loading