Skip to content

Commit

Permalink
实现function方式调用(kill或异常终止时不会panic). gh-ost: github#479,github#577
Browse files Browse the repository at this point in the history
  • Loading branch information
hanchuanchuan committed Mar 4, 2019
1 parent 1cb650b commit afefb76
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type Migrator struct {
finishedMigrating int64

// Log *io.Writer
Log *bytes.Buffer
Log *bytes.Buffer
Error error
}

func NewMigrator(context *base.MigrationContext) *Migrator {
Expand All @@ -100,6 +101,8 @@ func NewMigrator(context *base.MigrationContext) *Migrator {
rowCopyComplete: make(chan error),
allEventsUpToLockProcessed: make(chan string),

// done: make(chan error),

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
handledChangelogStates: make(map[string]bool),
Expand Down Expand Up @@ -324,7 +327,8 @@ func (this *Migrator) Migrate() (err error) {
return err
}

go this.listenOnPanicAbort()

// go this.listenOnPanicAbort()

if err := this.initiateHooksExecutor(); err != nil {
return err
Expand Down Expand Up @@ -356,6 +360,14 @@ func (this *Migrator) Migrate() (err error) {
return err
}

go this.FinalizeMigration()

migrateErr := <-this.migrationContext.PanicAbort
return migrateErr
}

func (this *Migrator) FinalizeMigration() {

initialLag, _ := this.inspector.getReplicationLag()
log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
<-this.ghostTableMigrated
Expand All @@ -365,32 +377,32 @@ func (this *Migrator) Migrate() (err error) {
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}

if err := this.initiateServer(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
defer this.server.RemoveSocketFile()

if err := this.countTableRows(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
if err := this.addDMLEventsListener(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
if err := this.initiateThrottler(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
go this.executeWriteFuncs()
go this.iterateChunks()
Expand All @@ -401,12 +413,12 @@ func (this *Migrator) Migrate() (err error) {
this.consumeRowCopyComplete()
log.Infof("Row copy complete")
if err := this.hooksExecutor.onRowCopyComplete(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
this.printStatus(ForcePrintStatusRule)

if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
var retrier func(func() error, ...bool) error
if this.migrationContext.CutOverExponentialBackoff {
Expand All @@ -415,18 +427,20 @@ func (this *Migrator) Migrate() (err error) {
retrier = this.retryOperation
}
if err := retrier(this.cutOver); err != nil {
return err
this.migrationContext.PanicAbort <- err
}
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)

if err := this.finalCleanup(); err != nil {
return nil
// return nil
this.migrationContext.PanicAbort <- nil
}
if err := this.hooksExecutor.onSuccess(); err != nil {
return err
this.migrationContext.PanicAbort <- nil
}
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil

this.migrationContext.PanicAbort <- nil
}

// ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external
Expand Down

0 comments on commit afefb76

Please sign in to comment.