diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 021462fa2..ac491b268 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -61,6 +61,9 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant `gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful. +### binlogsyncer-max-reconnect-attempts +`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0` + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: diff --git a/go/base/context.go b/go/base/context.go index e3472f5bd..63b6d6da7 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -232,6 +232,8 @@ type MigrationContext struct { recentBinlogCoordinates mysql.BinlogCoordinates + BinlogSyncerMaxReconnectAttempts int + Log Logger } diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 6b7d06cc1..8cf35eabe 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -36,14 +36,15 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinatesMutex: &sync.Mutex{}, binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{ - ServerID: uint32(migrationContext.ReplicaServerId), - Flavor: gomysql.MySQLFlavor, - Host: connectionConfig.Key.Hostname, - Port: uint16(connectionConfig.Key.Port), - User: connectionConfig.User, - Password: connectionConfig.Password, - TLSConfig: connectionConfig.TLSConfig(), - UseDecimal: true, + ServerID: uint32(migrationContext.ReplicaServerId), + Flavor: gomysql.MySQLFlavor, + Host: connectionConfig.Key.Hostname, + Port: uint16(connectionConfig.Key.Port), + User: connectionConfig.User, + Password: connectionConfig.Password, + TLSConfig: connectionConfig.TLSConfig(), + UseDecimal: true, + MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts, }), } } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 3daf24441..926223293 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -134,6 +134,7 @@ func main() { flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook") flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999") + flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits") diff --git a/go/logic/applier.go b/go/logic/applier.go index ad6368e61..9554c59d0 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -9,7 +9,6 @@ import ( gosql "database/sql" "fmt" "strings" - "sync" "sync/atomic" "time" @@ -935,7 +934,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error { } // AtomicCutOverMagicLock -func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error { +func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error { tx, err := this.db.Begin() if err != nil { tableLocked <- err @@ -946,6 +945,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads") tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads") tx.Rollback() + this.DropAtomicCutOverSentryTableIfExists() }() var sessionId int64 @@ -1014,12 +1014,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke sql.EscapeName(this.migrationContext.GetOldTableName()), ) - dropCutOverSentryTableOnce.Do(func() { - if _, err := tx.Exec(query); err != nil { - this.migrationContext.Log.Errore(err) - // We DO NOT return here because we must `UNLOCK TABLES`! - } - }) + if _, err := tx.Exec(query); err != nil { + this.migrationContext.Log.Errore(err) + // We DO NOT return here because we must `UNLOCK TABLES`! + } // Tables still locked this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s", diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a102188a8..b4d0a9ae1 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -13,7 +13,6 @@ import ( "math" "os" "strings" - "sync" "sync/atomic" "time" @@ -639,12 +638,8 @@ func (this *Migrator) atomicCutOver() (err error) { defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) okToUnlockTable := make(chan bool, 4) - var dropCutOverSentryTableOnce sync.Once defer func() { okToUnlockTable <- true - dropCutOverSentryTableOnce.Do(func() { - this.applier.DropAtomicCutOverSentryTableIfExists() - }) }() atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) @@ -653,7 +648,7 @@ func (this *Migrator) atomicCutOver() (err error) { tableLocked := make(chan error, 2) tableUnlocked := make(chan error, 2) go func() { - if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &dropCutOverSentryTableOnce); err != nil { + if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { this.migrationContext.Log.Errore(err) } }()