Skip to content

Commit ef66dec

Browse files
committed
prepare for oracle dump
1 parent 40fbcbe commit ef66dec

File tree

1 file changed

+152
-29
lines changed

1 file changed

+152
-29
lines changed

driver/oracle/extractor/extractor_oracle.go

Lines changed: 152 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -203,39 +203,50 @@ func (e *ExtractorOracle) Run() {
203203
return
204204
}
205205

206-
startSCN, committedSCN, err := e.calculateSCNPos()
207-
if err != nil {
208-
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
209-
return
206+
e.initDBConnections()
207+
208+
fullCopy := true
209+
{
210+
startSCN, committedSCN, err := e.calculateSCNPos()
211+
if err != nil {
212+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
213+
return
214+
}
215+
// todo
216+
notFullCopy := true
217+
if notFullCopy {
218+
fullCopy = false
219+
} else if startSCN == 0 && committedSCN == 0 {
220+
fullCopy = false
221+
}
210222
}
211223

212-
e.initDBConnections()
213-
e.getSchemaTablesAndMeta()
214-
215-
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
216-
startSCN, committedSCN, 100000)
217-
//e.logger.Info("CheckAndApplyLowerCaseTableNames")
218-
//e.CheckAndApplyLowerCaseTableNames()
219-
// 字符集同步 todo
220-
fullCopy := false
221224
if fullCopy {
222-
e.logger.Debug("mysqlDump. before")
223-
} else { // no full copy
224-
// Will not get consistent table meta-info for an incremental only job.
225-
// https://github.com/actiontech/dtle/issues/321#issuecomment-441191534
226-
// 获取需要同步的表结构数据
227-
//if err := e.getSchemaTablesAndMeta(); err != nil {
228-
// e.onError(common.TaskStateDead, err)
229-
// return
230-
//}
231-
}
232-
//err = e.sendFullComplete()
225+
e.logger.Debug("oracleDump. before")
226+
if err := e.oracleDump(); err != nil {
227+
e.onError(common.TaskStateDead, err)
228+
return
229+
}
230+
err = e.sendFullComplete()
231+
if err != nil {
232+
e.onError(common.TaskStateDead, errors.Wrap(err, "sendFullComplete"))
233+
return
234+
}
235+
} else {
236+
if err := e.getSchemaTablesAndMeta(); err != nil {
237+
e.onError(common.TaskStateDead, err)
238+
return
239+
}
240+
}
241+
233242
{
234-
//if err != nil {
235-
// e.logger.Error("error after streamerReadyCh", "err", err)
236-
// e.onError(common.TaskStateDead, err)
237-
// return
238-
//}
243+
startSCN, committedSCN, err := e.calculateSCNPos()
244+
if err != nil {
245+
e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos"))
246+
return
247+
}
248+
e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb,
249+
startSCN, committedSCN, 100000)
239250
e.logger.Debug("start .initiateStreaming before")
240251
if err := e.initiateStreaming(); err != nil {
241252
e.logger.Error("error at initiateStreaming", "err", err)
@@ -851,3 +862,115 @@ func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames() {
851862
lowerConfigItem(e.mysqlContext.ReplicateIgnoreDb)
852863
}
853864
}
865+
866+
func (e *ExtractorOracle) oracleDump() error {
867+
// step 1 : todo lock row
868+
// query : lock table schema.table in row share mode;
869+
if err := e.getSchemaTablesAndMeta(); err != nil {
870+
e.onError(common.TaskStateDead, err)
871+
return err
872+
}
873+
874+
// step 2 : get current scn for d.Dump()
875+
_, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN()
876+
if err != nil {
877+
return err
878+
}
879+
880+
// step 3 : defer unlock row
881+
882+
// step 4 : create table ddl
883+
if !e.mysqlContext.SkipCreateDbTable {
884+
e.logger.Info("generating DROP and CREATE statements to reflect current database schemas",
885+
"replicateDoDb", e.replicateDoDb)
886+
887+
for _, db := range e.replicateDoDb {
888+
var dbSQL string
889+
// rename schema
890+
if db.TableSchemaRename != "" {
891+
dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchemaRename))
892+
} else {
893+
dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchema))
894+
}
895+
896+
entry := &common.DumpEntry{
897+
DbSQL: dbSQL,
898+
}
899+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
900+
e.onError(common.TaskStateRestart, err)
901+
}
902+
903+
for _, tb := range db.Tables {
904+
tbSQL := make([]string, 0)
905+
906+
targetSchema := g.StringElse(db.TableSchemaRename, tb.TableSchema)
907+
targetTable := g.StringElse(tb.TableRename, tb.TableName)
908+
if e.mysqlContext.DropTableIfExists {
909+
tbSQL = append(tbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
910+
mysqlconfig.EscapeName(targetSchema), mysqlconfig.EscapeName(targetTable)))
911+
}
912+
// Get the specified table's create DDL statement.
913+
createTbSQL, err := e.oracleDB.GetTableDDL(tb.TableSchema, tb.TableName)
914+
if err != nil {
915+
return err
916+
}
917+
918+
// parse ddl
919+
for i := range createTbSQL {
920+
dataEvent, err := e.parseDDLSQL(entry.TbSQL[i], tb.TableSchema)
921+
if err != nil {
922+
return err
923+
}
924+
tbSQL = append(tbSQL, dataEvent.Query)
925+
}
926+
927+
entry := &common.DumpEntry{
928+
TbSQL: tbSQL,
929+
// TotalCount: tb.Counter,
930+
}
931+
if err := e.encodeAndSendDumpEntry(entry); err != nil {
932+
e.onError(common.TaskStateRestart, err)
933+
}
934+
}
935+
}
936+
}
937+
// step 5: Dump all of the tables and generate source records ...
938+
// todo need merged dumper with mysql dumper
939+
// for _, db := range e.replicateDoDb {
940+
// for _, t := range db.Tables {
941+
// d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
942+
// if err := d.Dump(); err != nil {
943+
// e.onError(common.TaskStateDead, err)
944+
// }
945+
// // todo close when shutdown
946+
// // e.dumpers = append(e.dumpers, d)
947+
948+
// // Scan the rows in the table ...
949+
// for entry := range d.resultsChannel {
950+
// if entry.Err != "" {
951+
// e.onError(common.TaskStateDead, fmt.Errorf(entry.Err))
952+
// } else {
953+
// if err := e.encodeAndSendDumpEntry(entry); err != nil {
954+
// e.onError(common.TaskStateRestart, err)
955+
// }
956+
// }
957+
// }
958+
// }
959+
// }
960+
return nil
961+
}
962+
963+
func (e *ExtractorOracle) encodeAndSendDumpEntry(entry *common.DumpEntry) error {
964+
bs, err := entry.Marshal(nil)
965+
if err != nil {
966+
return err
967+
}
968+
txMsg, err := common.Compress(bs)
969+
if err != nil {
970+
return errors.Wrap(err, "common.Compress")
971+
}
972+
if err := e.publish(fmt.Sprintf("%s_full", e.subject), txMsg, 0); err != nil {
973+
return err
974+
}
975+
return nil
976+
}

0 commit comments

Comments
 (0)