Skip to content

Commit 4ffdfa2

Browse files
committed
full and incremental docking
1 parent 86860da commit 4ffdfa2

File tree

5 files changed

+88
-59
lines changed

5 files changed

+88
-59
lines changed

driver/common/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (o *OracleCoordinateTx)GetFieldValue(fieldName string)interface{}{
8686

8787

8888
func (b *OracleCoordinates) GetLogPos() int64 {
89-
return 0
89+
return b.LaststSCN
9090
}
9191

9292
func (b *OracleCoordinates) GetTxSet() string {

driver/common/type.schema

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct MySQLCoordinates {
3434
}
3535

3636
struct OracleCoordinates {
37-
37+
LaststSCN int64
3838
}
3939

4040
struct DumpStatResult {

driver/common/type.schema.gen.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,10 +1508,12 @@ func (d *MySQLCoordinates) Unmarshal(buf []byte) (uint64, error) {
15081508
}
15091509

15101510
type OracleCoordinates struct {
1511+
LaststSCN int64
15111512
}
15121513

15131514
func (d *OracleCoordinates) Size() (s uint64) {
15141515

1516+
s += 8
15151517
return
15161518
}
15171519
func (d *OracleCoordinates) Marshal(buf []byte) ([]byte, error) {
@@ -1525,13 +1527,37 @@ func (d *OracleCoordinates) Marshal(buf []byte) ([]byte, error) {
15251527
}
15261528
i := uint64(0)
15271529

1528-
return buf[:i+0], nil
1530+
{
1531+
1532+
buf[0+0] = byte(d.LaststSCN >> 0)
1533+
1534+
buf[1+0] = byte(d.LaststSCN >> 8)
1535+
1536+
buf[2+0] = byte(d.LaststSCN >> 16)
1537+
1538+
buf[3+0] = byte(d.LaststSCN >> 24)
1539+
1540+
buf[4+0] = byte(d.LaststSCN >> 32)
1541+
1542+
buf[5+0] = byte(d.LaststSCN >> 40)
1543+
1544+
buf[6+0] = byte(d.LaststSCN >> 48)
1545+
1546+
buf[7+0] = byte(d.LaststSCN >> 56)
1547+
1548+
}
1549+
return buf[:i+8], nil
15291550
}
15301551

15311552
func (d *OracleCoordinates) Unmarshal(buf []byte) (uint64, error) {
15321553
i := uint64(0)
15331554

1534-
return i + 0, nil
1555+
{
1556+
1557+
d.LaststSCN = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)
1558+
1559+
}
1560+
return i + 8, nil
15351561
}
15361562

15371563
type DumpStatResult struct {

driver/mysql/applier.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -529,31 +529,38 @@ func (a *Applier) subscribeNats() (err error) {
529529
}
530530
a.logger.Info("Rows copy complete.", "TotalRowsReplayed", a.TotalRowsReplayed)
531531

532-
if a.mysqlContext.ForeignKeyChecks {
533-
err = a.enableForeignKeyChecks()
532+
if a.ai.sourceType == "oracle" {
533+
err = a.storeManager.SaveOracleSCNPos(a.subject, dumpData.Coord.GetLogPos(), dumpData.Coord.GetLogPos())
534534
if err != nil {
535-
a.onError(common.TaskStateDead, errors.Wrap(err, "enableForeignKeyChecks"))
536-
return
535+
a.onError(common.TaskStateDead, errors.Wrap(err, "SaveOracleSCNPos"))
537536
}
538537
} else {
539-
a.logger.Warn("ParallelWorkers > 1 and UseMySQLDependency = false. disabling MySQL session.foreign_key_checks")
540-
}
538+
if a.mysqlContext.ForeignKeyChecks {
539+
err = a.enableForeignKeyChecks()
540+
if err != nil {
541+
a.onError(common.TaskStateDead, errors.Wrap(err, "enableForeignKeyChecks"))
542+
return
543+
}
544+
} else {
545+
a.logger.Warn("ParallelWorkers > 1 and UseMySQLDependency = false. disabling MySQL session.foreign_key_checks")
546+
}
541547

542-
a.logger.Info("got gtid from extractor", "gtid", dumpData.Coord.GetTxSet())
543-
// Do not re-assign a.gtidSet (#538). Update it.
544-
gs0, err := gomysql.ParseMysqlGTIDSet(dumpData.Coord.GetTxSet())
545-
if err != nil {
546-
a.onError(common.TaskStateDead, errors.Wrap(err, "ParseMysqlGTIDSet"))
547-
return
548-
}
549-
gs := gs0.(*gomysql.MysqlGTIDSet)
550-
for _, uuidSet := range gs.Sets {
551-
a.gtidSet.AddSet(uuidSet)
548+
a.logger.Info("got gtid from extractor", "gtid", dumpData.Coord.GetTxSet())
549+
// Do not re-assign a.gtidSet (#538). Update it.
550+
gs0, err := gomysql.ParseMysqlGTIDSet(dumpData.Coord.GetTxSet())
551+
if err != nil {
552+
a.onError(common.TaskStateDead, errors.Wrap(err, "ParseMysqlGTIDSet"))
553+
return
554+
}
555+
gs := gs0.(*gomysql.MysqlGTIDSet)
556+
for _, uuidSet := range gs.Sets {
557+
a.gtidSet.AddSet(uuidSet)
558+
}
559+
a.mysqlContext.Gtid = dumpData.Coord.GetTxSet()
560+
a.mysqlContext.BinlogFile = dumpData.Coord.GetLogFile()
561+
a.mysqlContext.BinlogPos = dumpData.Coord.GetLogPos()
562+
a.gtidCh <- nil // coord == nil is a flag for update/upload gtid
552563
}
553-
a.mysqlContext.Gtid = dumpData.Coord.GetTxSet()
554-
a.mysqlContext.BinlogFile = dumpData.Coord.GetLogFile()
555-
a.mysqlContext.BinlogPos = dumpData.Coord.GetLogPos()
556-
a.gtidCh <- nil // coord == nil is a flag for update/upload gtid
557564

558565
a.mysqlContext.Stage = common.StageSlaveWaitingForWorkersToProcessQueue
559566
if a.stage != JobIncrCopy {

driver/oracle/extractor/extractor_oracle.go

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,12 @@ type ExtractorOracle struct {
6565
//dumpers []*mysql.dumper
6666
// db.tb exists when creating the job, for full-copy.
6767
// vs e.mysqlContext.ReplicateDoDb: all user assigned db.tb
68-
replicateDoDb []*common.DataSource
69-
dataChannel chan *common.EntryContext
70-
inspector *mysql.Inspector
71-
binlogReader *binlog.BinlogReader
72-
LogMinerStream *LogMinerStream
73-
initialBinlogCoordinates *common.OracleCoordinates
74-
currentBinlogCoordinates *common.OracleCoordinateTx
68+
replicateDoDb []*common.DataSource
69+
dataChannel chan *common.EntryContext
70+
inspector *mysql.Inspector
71+
binlogReader *binlog.BinlogReader
72+
LogMinerStream *LogMinerStream
73+
fullCopyCoordinates *common.OracleCoordinates
7574
//rowCopyComplete chan bool
7675
rowCopyCompleteFlag int64
7776
tableCount int
@@ -207,20 +206,16 @@ func (e *ExtractorOracle) Run() {
207206

208207
e.initDBConnections()
209208

209+
startSCN, committedSCN, err := e.calculateSCNPos()
210+
if err != nil {
211+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
212+
return
213+
}
210214
fullCopy := true
211-
{
212-
startSCN, committedSCN, err := e.calculateSCNPos()
213-
if err != nil {
214-
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
215-
return
216-
}
217-
// todo
218-
notFullCopy := true
219-
if notFullCopy {
220-
fullCopy = false
221-
} else if startSCN != 0 || committedSCN != 0 {
222-
fullCopy = false
223-
}
215+
if e.mysqlContext.OracleConfig.Scn != 0 {
216+
fullCopy = false
217+
} else if startSCN != 0 || committedSCN != 0 {
218+
fullCopy = false
224219
}
225220

226221
if fullCopy {
@@ -239,21 +234,19 @@ func (e *ExtractorOracle) Run() {
239234
e.onError(common.TaskStateDead, err)
240235
return
241236
}
237+
}
242238

243-
{
244-
startSCN, committedSCN, err := e.calculateSCNPos()
245-
if err != nil {
246-
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
247-
return
248-
}
249-
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
250-
startSCN, committedSCN, 100000)
251-
e.logger.Debug("start .initiateStreaming before")
252-
if err := e.initiateStreaming(); err != nil {
253-
e.logger.Error("error at initiateStreaming", "err", err)
254-
e.onError(common.TaskStateDead, err)
255-
return
256-
}
239+
{
240+
if startSCN == 0 && committedSCN == 0 {
241+
startSCN, committedSCN = e.oracleDB.SCN, e.oracleDB.SCN
242+
}
243+
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
244+
startSCN, committedSCN, 100000)
245+
e.logger.Debug("start .initiateStreaming before")
246+
if err := e.initiateStreaming(); err != nil {
247+
e.logger.Error("error at initiateStreaming", "err", err)
248+
e.onError(common.TaskStateDead, err)
249+
return
257250
}
258251
}
259252
}
@@ -839,7 +832,7 @@ func (e *ExtractorOracle) onError(state int, err error) {
839832

840833
func (e *ExtractorOracle) sendFullComplete() (err error) {
841834
dumpMsg, err := common.Encode(&common.DumpStatResult{
842-
Coord: e.initialBinlogCoordinates,
835+
Coord: e.fullCopyCoordinates,
843836
})
844837
if err != nil {
845838
return err
@@ -979,6 +972,9 @@ func (e *ExtractorOracle) oracleDump() error {
979972
}
980973
}
981974
}
975+
e.fullCopyCoordinates = &common.OracleCoordinates{
976+
LaststSCN: e.oracleDB.SCN,
977+
}
982978
return nil
983979
}
984980

0 commit comments

Comments
 (0)