@@ -65,13 +65,12 @@ type ExtractorOracle struct {
6565 //dumpers []*mysql.dumper
6666 // db.tb exists when creating the job, for full-copy.
6767 // vs e.mysqlContext.ReplicateDoDb: all user assigned db.tb
68- replicateDoDb []* common.DataSource
69- dataChannel chan * common.EntryContext
70- inspector * mysql.Inspector
71- binlogReader * binlog.BinlogReader
72- LogMinerStream * LogMinerStream
73- initialBinlogCoordinates * common.OracleCoordinates
74- currentBinlogCoordinates * common.OracleCoordinateTx
68+ replicateDoDb []* common.DataSource
69+ dataChannel chan * common.EntryContext
70+ inspector * mysql.Inspector
71+ binlogReader * binlog.BinlogReader
72+ LogMinerStream * LogMinerStream
73+ fullCopyCoordinates * common.OracleCoordinates
7574 //rowCopyComplete chan bool
7675 rowCopyCompleteFlag int64
7776 tableCount int
@@ -207,20 +206,16 @@ func (e *ExtractorOracle) Run() {
207206
208207 e .initDBConnections ()
209208
209+ startSCN , committedSCN , err := e .calculateSCNPos ()
210+ if err != nil {
211+ e .onError (common .TaskStateDead , errors .Wrap (err , "calculateSCNPos" ))
212+ return
213+ }
210214 fullCopy := true
211- {
212- startSCN , committedSCN , err := e .calculateSCNPos ()
213- if err != nil {
214- e .onError (common .TaskStateDead , errors .Wrap (err , "calculateSCNPos" ))
215- return
216- }
217- // todo
218- notFullCopy := true
219- if notFullCopy {
220- fullCopy = false
221- } else if startSCN != 0 || committedSCN != 0 {
222- fullCopy = false
223- }
215+ if e .mysqlContext .OracleConfig .Scn != 0 {
216+ fullCopy = false
217+ } else if startSCN != 0 || committedSCN != 0 {
218+ fullCopy = false
224219 }
225220
226221 if fullCopy {
@@ -239,21 +234,19 @@ func (e *ExtractorOracle) Run() {
239234 e .onError (common .TaskStateDead , err )
240235 return
241236 }
237+ }
242238
243- {
244- startSCN , committedSCN , err := e .calculateSCNPos ()
245- if err != nil {
246- e .onError (common .TaskStateDead , errors .Wrap (err , "calculateSCNPos" ))
247- return
248- }
249- e .LogMinerStream = NewLogMinerStream (e .oracleDB , e .logger , e .mysqlContext .ReplicateDoDb , e .mysqlContext .ReplicateIgnoreDb ,
250- startSCN , committedSCN , 100000 )
251- e .logger .Debug ("start .initiateStreaming before" )
252- if err := e .initiateStreaming (); err != nil {
253- e .logger .Error ("error at initiateStreaming" , "err" , err )
254- e .onError (common .TaskStateDead , err )
255- return
256- }
239+ {
240+ if startSCN == 0 && committedSCN == 0 {
241+ startSCN , committedSCN = e .oracleDB .SCN , e .oracleDB .SCN
242+ }
243+ e .LogMinerStream = NewLogMinerStream (e .oracleDB , e .logger , e .mysqlContext .ReplicateDoDb , e .mysqlContext .ReplicateIgnoreDb ,
244+ startSCN , committedSCN , 100000 )
245+ e .logger .Debug ("start .initiateStreaming before" )
246+ if err := e .initiateStreaming (); err != nil {
247+ e .logger .Error ("error at initiateStreaming" , "err" , err )
248+ e .onError (common .TaskStateDead , err )
249+ return
257250 }
258251 }
259252}
@@ -839,7 +832,7 @@ func (e *ExtractorOracle) onError(state int, err error) {
839832
840833func (e * ExtractorOracle ) sendFullComplete () (err error ) {
841834 dumpMsg , err := common .Encode (& common.DumpStatResult {
842- Coord : e .initialBinlogCoordinates ,
835+ Coord : e .fullCopyCoordinates ,
843836 })
844837 if err != nil {
845838 return err
@@ -979,6 +972,9 @@ func (e *ExtractorOracle) oracleDump() error {
979972 }
980973 }
981974 }
975+ e .fullCopyCoordinates = & common.OracleCoordinates {
976+ LaststSCN : e .oracleDB .SCN ,
977+ }
982978 return nil
983979}
984980
0 commit comments