Skip to content

Commit 1d42c42

Browse files
committed
get full table data as of scn
1 parent 6508edb commit 1d42c42

File tree

3 files changed

+119
-25
lines changed

3 files changed

+119
-25
lines changed

driver/mysql/applier.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,9 @@ func (a *Applier) Run() {
327327
}
328328
a.ai.OnError = a.onError
329329

330-
go a.updateGtidLoop()
331-
330+
if sourceType == "mysql" {
331+
go a.updateGtidLoop()
332+
}
332333
if a.stage != JobFullCopy {
333334
a.stage = JobFullCopy
334335
a.sendEvent(JobFullCopy)

driver/oracle/extractor/dumper.go

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package extractor
22

33
import (
4+
"context"
5+
"fmt"
6+
"sync/atomic"
7+
"time"
8+
49
"github.com/actiontech/dtle/driver/common"
510
"github.com/actiontech/dtle/driver/oracle/config"
611

@@ -9,15 +14,20 @@ import (
914

1015
type dumper struct {
1116
*common.Dumper
12-
db *config.OracleDB
17+
db *config.OracleDB
18+
snapshotSCN int64
1319
}
1420

1521
func NewDumper(db *config.OracleDB, table *common.Table, chunkSize int64,
16-
logger g.LoggerType, memory *int64) *dumper {
17-
return &dumper{
22+
logger g.LoggerType, memory *int64, scn int64) *dumper {
23+
d := &dumper{
1824
common.NewDumper(table, chunkSize, logger, memory),
1925
db,
26+
scn,
2027
}
28+
d.PrepareForDumping = d.prepareForDumping
29+
d.GetChunkData = d.getChunkData
30+
return d
2131
}
2232

2333
func (d *dumper) prepareForDumping() error {
@@ -39,12 +49,95 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
3949
entry.TableSchema = d.Table.TableSchemaRename
4050
}
4151

42-
// build query sql (SELECT * FROM x.x LIMIT x OFFSET x AS OF SCN 123)
52+
// build query sql
53+
// SELECT * FROM %s.%s AS OF SCN %d where ROWNUM <= %d
54+
// minus
55+
// SELECT * FROM %s.%s AS OF SCN %d where ROWNUM < %d
56+
defer func() {
57+
if err != nil {
58+
entry.Err = err.Error()
59+
}
60+
if err == nil && len(entry.ValuesX) == 0 {
61+
return
62+
}
63+
64+
keepGoing := true
65+
timer := time.NewTicker(time.Second)
66+
defer timer.Stop()
67+
for keepGoing {
68+
select {
69+
case <-d.ShutdownCh:
70+
keepGoing = false
71+
case d.ResultsChannel <- entry:
72+
atomic.AddInt64(d.Memory, int64(entry.Size()))
73+
//d.logger.Debug("*** memory", "memory", atomic.LoadInt64(d.memory))
74+
keepGoing = false
75+
case <-timer.C:
76+
d.Logger.Debug("resultsChannel full. waiting and ping conn")
77+
errPing := d.db.MetaDataConn.PingContext(context.TODO())
78+
if errPing != nil {
79+
d.Logger.Debug("ping query row got error.", "err", errPing)
80+
}
81+
}
82+
}
83+
d.Logger.Debug("resultsChannel", "n", len(d.ResultsChannel))
84+
}()
85+
86+
query := fmt.Sprintf(
87+
`SELECT * FROM %s.%s AS OF SCN %d where ROWNUM <= %d
88+
minus
89+
SELECT * FROM %s.%s AS OF SCN %d where ROWNUM < %d `,
90+
d.TableSchema,
91+
d.TableName,
92+
d.snapshotSCN,
93+
(d.Iteration+1)*d.ChunkSize,
94+
d.TableSchema,
95+
d.TableName,
96+
d.snapshotSCN,
97+
(d.Iteration*d.ChunkSize)+1,
98+
)
4399

44-
// offset
100+
d.Logger.Debug("getChunkData.", "query", query)
101+
102+
// this must be increased after building query
45103
d.Iteration += 1
46-
// exec sql
47-
// d.db.MetaDataConn.QueryRowContext()
104+
rows, err := d.db.MetaDataConn.QueryContext(context.TODO(), query)
105+
if err != nil {
106+
newErr := fmt.Errorf("error at select chunk. err: %v", err)
107+
d.Logger.Error(newErr.Error())
108+
return 0, err
109+
}
110+
111+
columns, err := rows.Columns()
112+
if err != nil {
113+
return 0, err
114+
}
115+
116+
scanArgs := make([]interface{}, len(columns)) // tmp use, for casting `values` to `[]interface{}`
117+
118+
for rows.Next() {
119+
rowValuesRaw := make([]*[]byte, len(columns))
120+
for i := range rowValuesRaw {
121+
scanArgs[i] = &rowValuesRaw[i]
122+
}
123+
124+
err = rows.Scan(scanArgs...)
125+
if err != nil {
126+
return 0, err
127+
}
128+
129+
entry.ValuesX = append(entry.ValuesX, rowValuesRaw)
130+
}
131+
132+
nRows = int64(len(entry.ValuesX))
133+
d.Logger.Debug("getChunkData.", "n_row", nRows)
134+
135+
if d.Table.TableRename != "" {
136+
entry.TableName = d.Table.TableRename
137+
}
138+
if d.Table.TableSchemaRename != "" {
139+
entry.TableSchema = d.Table.TableSchemaRename
140+
}
48141

49142
return nRows, nil
50143
}

driver/oracle/extractor/extractor_oracle.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,21 +237,21 @@ func (e *ExtractorOracle) Run() {
237237
e.onError(common.TaskStateDead, err)
238238
return
239239
}
240-
}
241240

242-
{
243-
startSCN, committedSCN, err := e.calculateSCNPos()
244-
if err != nil {
245-
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
246-
return
247-
}
248-
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
249-
startSCN, committedSCN, 100000)
250-
e.logger.Debug("start .initiateStreaming before")
251-
if err := e.initiateStreaming(); err != nil {
252-
e.logger.Error("error at initiateStreaming", "err", err)
253-
e.onError(common.TaskStateDead, err)
254-
return
241+
{
242+
startSCN, committedSCN, err := e.calculateSCNPos()
243+
if err != nil {
244+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
245+
return
246+
}
247+
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
248+
startSCN, committedSCN, 100000)
249+
e.logger.Debug("start .initiateStreaming before")
250+
if err := e.initiateStreaming(); err != nil {
251+
e.logger.Error("error at initiateStreaming", "err", err)
252+
e.onError(common.TaskStateDead, err)
253+
return
254+
}
255255
}
256256
}
257257
}
@@ -872,7 +872,7 @@ func (e *ExtractorOracle) oracleDump() error {
872872
}
873873

874874
// step 2 : get current scn for d.Dump()
875-
_, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN()
875+
currentSCN, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN()
876876
if err != nil {
877877
return err
878878
}
@@ -935,7 +935,7 @@ func (e *ExtractorOracle) oracleDump() error {
935935
// todo need merged dumper with mysql dumper
936936
for _, db := range e.replicateDoDb {
937937
for _, t := range db.Tables {
938-
d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
938+
d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1, currentSCN)
939939
if err := d.Dump(); err != nil {
940940
e.onError(common.TaskStateDead, err)
941941
}

0 commit comments

Comments
 (0)