@@ -207,12 +207,20 @@ func (this *Migrator) canStopStreaming() bool {
207
207
return atomic .LoadInt64 (& this .migrationContext .CutOverCompleteFlag ) != 0
208
208
}
209
209
210
- // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
211
- func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
210
+ // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
211
+ func (this * Migrator ) onChangelogEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
212
212
// Hey, I created the changelog table, I know the type of columns it has!
213
- if hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint != "state" {
213
+ switch hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint {
214
+ case "state" :
215
+ return this .onChangelogStateEvent (dmlEvent )
216
+ case "heartbeat" :
217
+ return this .onChangelogHeartbeatEvent (dmlEvent )
218
+ default :
214
219
return nil
215
220
}
221
+ }
222
+
223
+ func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
216
224
changelogStateString := dmlEvent .NewColumnValues .StringColumn (3 )
217
225
changelogState := ReadChangelogState (changelogStateString )
218
226
this .migrationContext .Log .Infof ("Intercepted changelog state %s" , changelogState )
@@ -245,6 +253,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
245
253
return nil
246
254
}
247
255
256
+ func (this * Migrator ) onChangelogHeartbeatEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
257
+ changelogHeartbeatString := dmlEvent .NewColumnValues .StringColumn (3 )
258
+
259
+ heartbeatTime , err := time .Parse (time .RFC3339Nano , changelogHeartbeatString )
260
+ if err != nil {
261
+ return this .migrationContext .Log .Errore (err )
262
+ } else {
263
+ this .migrationContext .SetLastHeartbeatOnChangelogTime (heartbeatTime )
264
+ return nil
265
+ }
266
+ }
267
+
248
268
// listenOnPanicAbort aborts on abort request
249
269
func (this * Migrator ) listenOnPanicAbort () {
250
270
err := <- this .migrationContext .PanicAbort
@@ -476,6 +496,13 @@ func (this *Migrator) cutOver() (err error) {
476
496
this .migrationContext .Log .Debugf ("checking for cut-over postpone" )
477
497
this .sleepWhileTrue (
478
498
func () (bool , error ) {
499
+ heartbeatLag := this .migrationContext .TimeSinceLastHeartbeatOnChangelog ()
500
+ maxLagMillisecondsThrottle := time .Duration (atomic .LoadInt64 (& this .migrationContext .MaxLagMillisecondsThrottleThreshold )) * time .Millisecond
501
+ cutOverLockTimeout := time .Duration (this .migrationContext .CutOverLockTimeoutSeconds ) * time .Second
502
+ if heartbeatLag > maxLagMillisecondsThrottle || heartbeatLag > cutOverLockTimeout {
503
+ this .migrationContext .Log .Debugf ("current HeartbeatLag (%.2fs) is too high, it needs to be less than both --max-lag-millis (%.2fs) and --cut-over-lock-timeout-seconds (%.2fs) to continue" , heartbeatLag .Seconds (), maxLagMillisecondsThrottle .Seconds (), cutOverLockTimeout .Seconds ())
504
+ return true , nil
505
+ }
479
506
if this .migrationContext .PostponeCutOverFlagFile == "" {
480
507
return false , nil
481
508
}
@@ -962,13 +989,14 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
962
989
963
990
currentBinlogCoordinates := * this .eventsStreamer .GetCurrentBinlogCoordinates ()
964
991
965
- status := fmt .Sprintf ("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s" ,
992
+ status := fmt .Sprintf ("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s" ,
966
993
totalRowsCopied , rowsEstimate , progressPct ,
967
994
atomic .LoadInt64 (& this .migrationContext .TotalDMLEventsApplied ),
968
995
len (this .applyEventsQueue ), cap (this .applyEventsQueue ),
969
996
base .PrettifyDurationOutput (elapsedTime ), base .PrettifyDurationOutput (this .migrationContext .ElapsedRowCopyTime ()),
970
997
currentBinlogCoordinates ,
971
998
this .migrationContext .GetCurrentLagDuration ().Seconds (),
999
+ this .migrationContext .TimeSinceLastHeartbeatOnChangelog ().Seconds (),
972
1000
state ,
973
1001
eta ,
974
1002
)
@@ -995,7 +1023,7 @@ func (this *Migrator) initiateStreaming() error {
995
1023
this .migrationContext .DatabaseName ,
996
1024
this .migrationContext .GetChangelogTableName (),
997
1025
func (dmlEvent * binlog.BinlogDMLEvent ) error {
998
- return this .onChangelogStateEvent (dmlEvent )
1026
+ return this .onChangelogEvent (dmlEvent )
999
1027
},
1000
1028
)
1001
1029
0 commit comments