Skip to content

Commit f13e5a1

Browse files
committed
prepare for oracle dump
1 parent 3fcbcb7 commit f13e5a1

File tree

2 files changed

+291
-29
lines changed

2 files changed

+291
-29
lines changed

driver/oracle/extractor/dumper.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package extractor
2+
3+
import (
4+
"strings"
5+
6+
"github.com/actiontech/dtle/driver/common"
7+
"github.com/actiontech/dtle/driver/oracle/config"
8+
9+
"github.com/actiontech/dtle/g"
10+
)
11+
12+
type dumper struct {
13+
logger g.LoggerType
14+
chunkSize int64
15+
TableSchema string
16+
TableName string
17+
table *common.Table
18+
columns string
19+
iteration int64
20+
resultsChannel chan *common.DumpEntry
21+
shutdown bool
22+
shutdownCh chan struct{}
23+
24+
db *config.OracleDB
25+
26+
memory *int64
27+
}
28+
29+
func NewDumper(db *config.OracleDB, table *common.Table, chunkSize int64,
30+
logger g.LoggerType, memory *int64) *dumper {
31+
dumper := &dumper{
32+
logger: logger,
33+
db: db,
34+
TableSchema: table.TableSchema,
35+
TableName: table.TableName,
36+
table: table,
37+
resultsChannel: make(chan *common.DumpEntry, 24),
38+
chunkSize: chunkSize,
39+
shutdownCh: make(chan struct{}),
40+
memory: memory,
41+
}
42+
return dumper
43+
}
44+
45+
type DumpEntryOrig struct {
46+
SystemVariablesStatement string
47+
SqlMode string
48+
DbSQL string
49+
TableName string
50+
TableSchema string
51+
TbSQL []string
52+
// For each `*interface{}` item, it is ensured to be not nil.
53+
// If field is sql-NULL, *item is nil. Else, *item is a `[]byte`.
54+
// TODO can we just use interface{}? Make sure it is not copied again and again.
55+
ValuesX [][]*interface{}
56+
TotalCount int64
57+
RowsCount int64
58+
Err error
59+
Table *common.Table
60+
}
61+
62+
func (d *dumper) prepareForDumping() error {
63+
needPm := false
64+
columns := make([]string, 0)
65+
66+
if needPm {
67+
d.columns = strings.Join(columns, ", ")
68+
} else {
69+
d.columns = "*"
70+
}
71+
72+
return nil
73+
}
74+
75+
// dumps a specific chunk, reading chunk info from the channel
76+
func (d *dumper) getChunkData() (nRows int64, err error) {
77+
entry := &common.DumpEntry{
78+
TableSchema: d.TableSchema,
79+
TableName: d.TableName,
80+
}
81+
82+
if d.table.TableRename != "" {
83+
entry.TableName = d.table.TableRename
84+
}
85+
if d.table.TableSchemaRename != "" {
86+
entry.TableSchema = d.table.TableSchemaRename
87+
}
88+
89+
// build query sql (SELECT * FROM x.x LIMIT x OFFSET x AS OF SCN 123)
90+
91+
// offset
92+
d.iteration += 1
93+
// exec sql
94+
// d.db.MetaDataConn.QueryRowContext()
95+
96+
return nRows, nil
97+
}
98+
99+
func (d *dumper) Dump() error {
100+
err := d.prepareForDumping()
101+
if err != nil {
102+
return err
103+
}
104+
105+
go func() {
106+
defer close(d.resultsChannel)
107+
for {
108+
select {
109+
case <-d.shutdownCh:
110+
return
111+
default:
112+
}
113+
114+
nRows, err := d.getChunkData()
115+
if err != nil {
116+
d.logger.Error("error at dump", "err", err)
117+
break
118+
}
119+
120+
if nRows < d.chunkSize {
121+
d.logger.Info("nRows < d.chunkSize.", "nRows", nRows, "chunkSize", d.chunkSize)
122+
}
123+
if nRows == 0 {
124+
d.logger.Info("nRows == 0. dump finished.", "nRows", nRows, "chunkSize", d.chunkSize)
125+
break
126+
}
127+
}
128+
}()
129+
130+
return nil
131+
}
132+
133+
func (d *dumper) Close() error {
134+
if d.shutdown {
135+
return nil
136+
}
137+
d.shutdown = true
138+
close(d.shutdownCh)
139+
return nil
140+
}

driver/oracle/extractor/extractor_oracle.go

Lines changed: 151 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -203,39 +203,50 @@ func (e *ExtractorOracle) Run() {
203203
return
204204
}
205205

206-
startSCN, committedSCN, err := e.calculateSCNPos()
207-
if err != nil {
208-
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
209-
return
206+
e.initDBConnections()
207+
208+
fullCopy := true
209+
{
210+
startSCN, committedSCN, err := e.calculateSCNPos()
211+
if err != nil {
212+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
213+
return
214+
}
215+
// todo
216+
notFullCopy := true
217+
if notFullCopy {
218+
fullCopy = false
219+
} else if startSCN == 0 && committedSCN == 0 {
220+
fullCopy = false
221+
}
210222
}
211223

212-
e.initDBConnections()
213-
e.getSchemaTablesAndMeta()
214-
215-
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
216-
startSCN, committedSCN, 100000)
217-
//e.logger.Info("CheckAndApplyLowerCaseTableNames")
218-
//e.CheckAndApplyLowerCaseTableNames()
219-
// 字符集同步 todo
220-
fullCopy := false
221224
if fullCopy {
222-
e.logger.Debug("mysqlDump. before")
223-
} else { // no full copy
224-
// Will not get consistent table meta-info for an incremental only job.
225-
// https://github.com/actiontech/dtle/issues/321#issuecomment-441191534
226-
// 获取需要同步的表结构数据
227-
//if err := e.getSchemaTablesAndMeta(); err != nil {
228-
// e.onError(common.TaskStateDead, err)
229-
// return
230-
//}
231-
}
232-
//err = e.sendFullComplete()
225+
e.logger.Debug("oracleDump. before")
226+
if err := e.oracleDump(); err != nil {
227+
e.onError(common.TaskStateDead, err)
228+
return
229+
}
230+
err = e.sendFullComplete()
231+
if err != nil {
232+
e.onError(common.TaskStateDead, errors.Wrap(err, "sendFullComplete"))
233+
return
234+
}
235+
} else {
236+
if err := e.getSchemaTablesAndMeta(); err != nil {
237+
e.onError(common.TaskStateDead, err)
238+
return
239+
}
240+
}
241+
233242
{
234-
//if err != nil {
235-
// e.logger.Error("error after streamerReadyCh", "err", err)
236-
// e.onError(common.TaskStateDead, err)
237-
// return
238-
//}
243+
startSCN, committedSCN, err := e.calculateSCNPos()
244+
if err != nil {
245+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
246+
return
247+
}
248+
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
249+
startSCN, committedSCN, 100000)
239250
e.logger.Debug("start .initiateStreaming before")
240251
if err := e.initiateStreaming(); err != nil {
241252
e.logger.Error("error at initiateStreaming", "err", err)
@@ -851,3 +862,114 @@ func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames() {
851862
lowerConfigItem(e.mysqlContext.ReplicateIgnoreDb)
852863
}
853864
}
865+
866+
func (e *ExtractorOracle) oracleDump() error {
867+
// step 1 : todo lock row
868+
// query : lock table schema.table in row share mode;
869+
if err := e.getSchemaTablesAndMeta(); err != nil {
870+
e.onError(common.TaskStateDead, err)
871+
return err
872+
}
873+
874+
// step 2 : get current scn for d.Dump()
875+
_, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN()
876+
if err != nil {
877+
return err
878+
}
879+
880+
// step 3 : defer unlock row
881+
882+
// step 4 : create table ddl
883+
if !e.mysqlContext.SkipCreateDbTable {
884+
e.logger.Info("generating DROP and CREATE statements to reflect current database schemas",
885+
"replicateDoDb", e.replicateDoDb)
886+
887+
for _, db := range e.replicateDoDb {
888+
var dbSQL string
889+
// rename schema
890+
if db.TableSchemaRename != "" {
891+
dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchemaRename))
892+
} else {
893+
dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchema))
894+
}
895+
896+
entry := &common.DumpEntry{
897+
DbSQL: dbSQL,
898+
}
899+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
900+
e.onError(common.TaskStateRestart, err)
901+
}
902+
903+
for _, tb := range db.Tables {
904+
tbSQL := make([]string, 0)
905+
906+
targetSchema := g.StringElse(db.TableSchemaRename, tb.TableSchema)
907+
targetTable := g.StringElse(tb.TableRename, tb.TableName)
908+
if e.mysqlContext.DropTableIfExists {
909+
tbSQL = append(tbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
910+
mysqlconfig.EscapeName(targetSchema), mysqlconfig.EscapeName(targetTable)))
911+
}
912+
// Get the specified table's create DDL statement.
913+
createTbSQL, err := e.oracleDB.GetTableDDL(tb.TableSchema, tb.TableName)
914+
if err != nil {
915+
return err
916+
}
917+
918+
// parse ddl
919+
for i := range createTbSQL {
920+
dataEvent, err := e.parseDDLSQL(entry.TbSQL[i], tb.TableSchema)
921+
if err != nil {
922+
return err
923+
}
924+
tbSQL = append(tbSQL, dataEvent.Query)
925+
}
926+
927+
entry := &common.DumpEntry{
928+
TbSQL: tbSQL,
929+
// TotalCount: tb.Counter,
930+
}
931+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
932+
e.onError(common.TaskStateRestart, err)
933+
}
934+
}
935+
}
936+
}
937+
// step 5: Dump all of the tables and generate source records ...
938+
for _, db := range e.replicateDoDb {
939+
for _, t := range db.Tables {
940+
d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
941+
if err := d.Dump(); err != nil {
942+
e.onError(common.TaskStateDead, err)
943+
}
944+
// todo close when shutdown
945+
// e.dumpers = append(e.dumpers, d)
946+
947+
// Scan the rows in the table ...
948+
for entry := range d.resultsChannel {
949+
if entry.Err != "" {
950+
e.onError(common.TaskStateDead, fmt.Errorf(entry.Err))
951+
} else {
952+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
953+
e.onError(common.TaskStateRestart, err)
954+
}
955+
}
956+
}
957+
}
958+
}
959+
return nil
960+
}
961+
962+
func (e *ExtractorOracle) encodeAndSendDumpEntry(entry *common.DumpEntry) error {
963+
bs, err := entry.Marshal(nil)
964+
if err != nil {
965+
return err
966+
}
967+
txMsg, err := common.Compress(bs)
968+
if err != nil {
969+
return errors.Wrap(err, "common.Compress")
970+
}
971+
if err := e.publish(fmt.Sprintf("%s_full", e.subject), txMsg, 0); err != nil {
972+
return err
973+
}
974+
return nil
975+
}

0 commit comments

Comments
 (0)