Skip to content

Commit

Permalink
an atomic cut-over implementation, as per issue #82
Browse files Browse the repository at this point in the history
  • Loading branch information
Shlomi Noach committed Jun 27, 2016
1 parent ad25e60 commit 0191b28
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.9.7"
RELEASE_VERSION="0.9.8"

buildpath=/tmp/gh-ost
target=gh-ost
Expand Down
3 changes: 2 additions & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const (
type CutOver int

const (
CutOverSafe CutOver = iota
CutOverAtomic CutOver = iota
CutOverSafe = iota
CutOverTwoStep = iota
)

Expand Down
6 changes: 4 additions & 2 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (atomic, two-step, voluntary-lock)")

flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
Expand Down Expand Up @@ -144,7 +144,9 @@ func main() {
}

switch *cutOver {
case "safe", "default", "":
case "atomic", "default", "":
migrationContext.CutOverType = base.CutOverAtomic
case "safe":
migrationContext.CutOverType = base.CutOverSafe
case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep
Expand Down
209 changes: 204 additions & 5 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/outbrain/golib/sqlutils"
)

const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)

// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
Expand Down Expand Up @@ -77,15 +81,21 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
return nil
}

// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
// showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
rowMap = nil
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)

sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
tableFound = true
rowMap = m
return nil
})
return tableFound
return rowMap
}

// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
m := this.showTableStatus(tableName)
return (m != nil)
}

// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
Expand Down Expand Up @@ -775,6 +785,195 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
return nil
}

// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName)
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
}
log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName)
}

// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err
}
tableName := this.migrationContext.GetOldTableName()

query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id int auto_increment primary key
) comment='%s'
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
atomicCutOverMagicHint,
)
log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Magic cut-over table created")

return nil
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
sessionIdChan <- -1
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
}()

var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId

lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}

tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}

if err := this.CreateAtomicCutOverSentryTable(); err != nil {
tableLocked <- err
return err
}

query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
log.Infof("Tables locked")
tableLocked <- nil // No error.

// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)

// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable
log.Infof("Will now proceed to drop magic table and unlock tables")

// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
if _, err := tx.Exec(query); err != nil {
log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}

// Tables still locked
log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
return log.Errore(err)
}
log.Infof("Tables unlocked")
tableUnlocked <- nil
return nil
}

// RenameOriginalTable will attempt renaming the original table into _old
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId

log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}

query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
return log.Errore(err)
}
tablesRenamed <- nil
log.Infof("Tables renamed")
return nil
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
Loading

0 comments on commit 0191b28

Please sign in to comment.