diff --git a/driver/oracle/extractor/log_miner.go b/driver/oracle/extractor/log_miner.go index b9a18fafb..5759ef965 100644 --- a/driver/oracle/extractor/log_miner.go +++ b/driver/oracle/extractor/log_miner.go @@ -590,10 +590,10 @@ func (e *ExtractorOracle) DataStreamEvents(entriesChannel chan<- *common.EntryCo func (e *ExtractorOracle) handleSQLs(tx *LogMinerTx) *common.DataEntry { entry := common.NewBinlogEntry() - entry.Final = true + entry.Final = true oracleCoordinateTx := &common.OracleCoordinateTx{ - OldestUncommittedScn: tx.oldestUncommittedScn, - EndSCN: tx.endScn, + OldestUncommittedScn: tx.oldestUncommittedScn, + EndSCN: tx.endScn, } entry.Coordinates = oracleCoordinateTx for _, row := range tx.records { @@ -925,9 +925,8 @@ func (e *ExtractorOracle) parseToDataEvent(row *LogMinerRecord) (common.DataEven return dataEvent, fmt.Errorf("parese dateEvent fail , operation Code %v", row.Operation) } func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (dataEvent common.DataEvent, err error) { - // e.logger.Debug("============= dml stmt parse start===============", "redoSQL", redoSQL, "undoSQL", undoSQL) - // todo 大小写问题 - OracleToMySQL := func(oracleSQL string) string { + // Convert oracle SQL to MySQL format that can be parsed by tidb parese + OracleToMySQLFormat := func(oracleSQL string) string { if strings.HasPrefix(oracleSQL, "insert into") { insertSqlSlice := strings.Split(oracleSQL, ") values (") if len(insertSqlSlice) == 2 { @@ -941,34 +940,34 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data delSqlSlice := strings.Split(oracleSQL, " where ") if len(delSqlSlice) == 2 { oracleSQL = strings.Replace(oracleSQL, delSqlSlice[0], ReplaceSpecifiedString(delSqlSlice[0], `"`, "`"), 1) - wereExper := delSqlSlice[1] - wereExperSli := strings.Split(wereExper, " and ") - for i := range wereExperSli { - colAndVal := strings.Split(wereExperSli[i], " = ") + whereExper := delSqlSlice[1] + whereExperSli := strings.Split(whereExper, " and ") + for i := range whereExperSli { + colAndVal := strings.Split(whereExperSli[i], " = ") if len(colAndVal) == 2 { - wereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"), + whereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"), ReplaceSpecifiedString(colAndVal[1], `\`, `\\`)) } } - wereExper = strings.Join(wereExperSli, " and ") - oracleSQL = strings.Replace(oracleSQL, delSqlSlice[1], wereExper, 1) + whereExper = strings.Join(whereExperSli, " and ") + oracleSQL = strings.Replace(oracleSQL, delSqlSlice[1], whereExper, 1) } } else if strings.HasPrefix(oracleSQL, "update") { // update "TEST"."BINARY_FLOAT6" set "COL2" ='500' where "COL1" = '3' and "COL2" = 'NULL'; updateSqlSlice := strings.Split(oracleSQL, " where ") if len(updateSqlSlice) == 2 { // "COL1" = '3' and "COL2" = 'NULL'; - wereExper := updateSqlSlice[1] - wereExperSli := strings.Split(wereExper, " and ") - for i := range wereExperSli { - colAndVal := strings.Split(wereExperSli[i], " = ") + whereExper := updateSqlSlice[1] + whereExperSli := strings.Split(whereExper, " and ") + for i := range whereExperSli { + colAndVal := strings.Split(whereExperSli[i], " = ") if len(colAndVal) == 2 { - wereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"), + whereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"), ReplaceSpecifiedString(colAndVal[1], `\`, `\\`)) } } - wereExper = strings.Join(wereExperSli, " and ") - updateSqlSlice[1] = wereExper + whereExper = strings.Join(whereExperSli, " and ") + updateSqlSlice[1] = whereExper // update "TEST"."BINARY_FLOAT6" set "COL2" ='500' and "COL1" = 'ss' headerExper := updateSqlSlice[0] @@ -979,7 +978,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data // "COL2" ='500' and "COL1" = 'ss' setExperSli := strings.Split(headExperSli[1], " and ") for i := range setExperSli { - colAndVal := strings.Split(wereExperSli[i], " = ") + colAndVal := strings.Split(setExperSli[i], " = ") setExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"), ReplaceSpecifiedString(colAndVal[1], `\`, `\\`)) } @@ -999,7 +998,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data } return oracleSQL } - redoSQL := OracleToMySQL(oracleRedoSQL) + redoSQL := OracleToMySQLFormat(oracleRedoSQL) p := parser.New() stmt, _, err := p.Parse(redoSQL, "", "") if err != nil { @@ -1046,7 +1045,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data case common.DeleteDML: dataEvent.Rows = [][]interface{}{visitor.WhereColumnValues} case common.UpdateDML: - undoSQL := OracleToMySQL(oracleUndoSQL) + undoSQL := OracleToMySQLFormat(oracleUndoSQL) undoP := parser.New() untoStmt, _, err := undoP.Parse(undoSQL, "", "") if err != nil { diff --git a/driver/oracle/extractor/log_miner_test.go b/driver/oracle/extractor/log_miner_test.go index a7291be83..be7578705 100644 --- a/driver/oracle/extractor/log_miner_test.go +++ b/driver/oracle/extractor/log_miner_test.go @@ -144,18 +144,19 @@ func TestParseDMLSQL(t *testing.T) { undo_sql string want_rows [][]interface{} }{ + // update SQL + { + name: "TESTNULL", + sql: "update \"TEST\".\"TESTNULL\" set \"COL1\" = NULL where \"COL1\" = 'T'", + undo_sql: "update \"TEST\".\"TESTNULL\" set \"COL1\" = 'T' where \"COL1\" IS NULL", + want_rows: [][]interface{}{{"T", nil}, {nil, nil}}, + }, + // Insert SQL { name: "NCHAR_255_COLUMNS", sql: `insert into "TEST"."NCHAR_255_COLUMNS"("COL1","COL2") values ('11',UNISTR('\6570\636E\5E93sql\6D4B\8BD5\6570\636E\5E93sql\6D4B\8BD5 '))`, undo_sql: ``, want_rows: [][]interface{}{{"11", "数据库sql测试数据库sql测试 "}}}, - // NUMBER(*) - // BFILE no support - // BINARY_FLOAT - // insert into TEST.TEST("COL1","COL2") values (1, 3.40282E+38F); - // insert into TEST.TEST("COL1","COL2") values (2, BINARY_FLOAT_INFINITY); - // insert into TEST.TEST("COL1","COL2") values (3, -BINARY_FLOAT_INFINITY); - // insert into TEST.TEST("COL1","COL2") values (4, BINARY_FLOAT_NAN); { name: "BINARY_FLOAT1", sql: `insert into "TEST"."BINARY_FLOAT1"("COL1","COL2") values ('0', '1.17549E-38F');`, @@ -188,7 +189,7 @@ func TestParseDMLSQL(t *testing.T) { }, { name: "BINARY_FLOAT6", - sql: `update "TEST"."BINARY_FLOAT6" set "COL2" ='500' where "COL1" = '3' and "COL2" = 'NULL';`, + sql: `update "TEST"."BINARY_FLOAT6" set "COL2" = '500' where "COL1" = '3' and "COL2" = 'NULL';`, undo_sql: `update "TEST"."BINARY_FLOAT6" set "COL2" = NULL where "COL1" = '3' and "COL2" = '50\0';`, want_rows: [][]interface{}{{"3", nil}, {"3", "50\\0"}}, }, @@ -198,10 +199,6 @@ func TestParseDMLSQL(t *testing.T) { undo_sql: `insert into "TEST"."BINARY_FLOAT7"("COL1","COL2") VALUES ('4', 'Nan');`, want_rows: [][]interface{}{{"4", nil}}, }, - // insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('1',NULL) - // insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('2',TO_DATE('-4712-01-01 00:00:00', 'SYYYY-MM-DD HH24:MI:SS')) - // insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('3',TO_DATE(' 9999-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS')) - // insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('4',TO_DATE(' 2003-05-03 21:02:44', 'SYYYY-MM-DD HH24:MI:SS')) { name: "DATE_COLUMNS", sql: `insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('1',NULL)`, @@ -226,7 +223,6 @@ func TestParseDMLSQL(t *testing.T) { undo_sql: ``, want_rows: [][]interface{}{{"4", " 2003-05-03 21:02:44"}}, }, - // CREATE TABLE TEST."te\shu"(COL1 INT, col2 CHAR(256)) { name: `te\shu`, sql: `insert into "TEST"."te\shu"("COL1","COL2") values ('5','x\x44')`, @@ -245,11 +241,6 @@ func TestParseDMLSQL(t *testing.T) { undo_sql: `insert into "TEST"."BINARY_FLOAT"("COL1","COL2") VALUES ('5', 'Nan');`, want_rows: [][]interface{}{{"5", `"`}}, }, - // { - // name: "NCHAR_255_COLUMNS", - // sql: `insert into "TEST"."NCHAR_255_COLUMNS"("COL1","COL2") values ('9',UNISTR('\6570\636E\5E93sql\6D4B\8BD5'))`, - // undo_sql: ``, - // want_rows: [][]interface{}{{"9", "数据库sql测试"}}}, { name: "CHAR_255_COLUMNS2", sql: `insert into "TEST"."CHAR_255_COLUMNS2"("COL1","COL2") values ('16','"')`, @@ -521,34 +512,6 @@ func TestParseDDLSQL(t *testing.T) { name: "createTableSQLCharRelation", sql: `CREATE TABLE TEST.XMLTYPE_COLUMNS(ID INT, C_XMLTYPE XMLTYPE);`, want: "CREATE TABLE `TEST`.`XMLTYPE_COLUMNS` (`ID` INT,`C_XMLTYPE` LONGTEXT) DEFAULT CHARACTER SET = UTF8MB4"}, - - // { - // name: "createTableSQLCharRelation", - // sql: `CREATE TABLE test."persons"( - // "first_name" VARCHAR(15) NOT NULL, - // last_name VARCHAR2(45) NOT NULL - // );`, - // want: "CREATE TABLE `TEST`.`persons (first_name VARCHAR(15),LAST_NAME VARCHAR(45)) DEFAULT CHARACTER SET = UTF8MB4"}, - // { - // // 是否支持没有 p s,或者仅仅支持 p s - // //NUMERIC_NAME NUMERIC(15,2), - // //Decimal_NAME DECIMAL(15,2), - // //Dec_NAME DEC(15,2), - // // ps 都为空时候,oracle上限为 38,0 mysql上限为10,0 - // name: "createTableSQLNumberRelation", - // sql: `CREATE TABLE test."persons"( - // "first_num" NUMBER(15,2) NOT NULL, - // second_num NUMBER(10) NOT NULL, - // three_num NUMBER(5,0) NOT NULL, - // last_name NUMBER NOT NULL, - // NUMERIC_NAME NUMERIC(15,2), - // Decimal_NAME DECIMAL(15,2), - // Dec_NAME DEC(15,2), - // INTEGER_NAME INTEGER, - // INT_NAME INT, - // SMALLINT_NAME SMALLINT - // );`, - // want: "CREATE TABLE `TEST`.`persons (first_num DECIMAL(15,2),SECOND_NUM BIGINT,THREE_NUM INT,LAST_NAME DOUBLE,NUMERIC_NAME NUMERIC(15,2),DECIMAL_NAME DECIMAL(15,2),DEC_NAME DEC(15,2),INTEGER_NAME INT,INT_NAME INT,SMALLINT_NAME DECIMAL(38)) DEFAULT CHARACTER SET = UTF8MB4"}, } logger := hclog.NewNullLogger() extractor := &ExtractorOracle{logger: logger, replicateDoDb: []*common.DataSource{}} @@ -736,11 +699,6 @@ func TestParseConstraintSQL(t *testing.T) { )`, want: "CREATE TABLE `TEST`.`EMPLOYEES_DEMO` (`EMPLOYEE_ID` INT,`FIRST_NAME` VARCHAR(20),`LAST_NAME` VARCHAR(25) NOT NULL,`EMAIL` VARCHAR(25) NOT NULL,`PHONE_NUMBER` VARCHAR(20),`HIRE_DATE` DATETIME NOT NULL,`JOB_ID` VARCHAR(10) NOT NULL,`SALARY` DECIMAL(8,2) NOT NULL,`COMMISSION_PCT` DECIMAL(2,2),`MANAGER_ID` INT,`DEPARTMENT_ID` SMALLINT,`DN` VARCHAR(300),UNIQUE `EMP_EMAIL_UK_DEMO`(`email`)) DEFAULT CHARACTER SET = UTF8MB4", }, - // { - // name: "createOutOfLineConstraint", - // sql: `CREATE TABLE TEST.emp1 ( id number REFERENCES TEST.USERINFO11 ( ID ), NAME VARCHAR ( 8 ) );`, - // want: "CREATE TABLE `TEST`.`EMP1` (`ID` DOUBLE,`NAME` VARCHAR(8)) DEFAULT CHARACTER SET = UTF8MB4", - // }, } logger := hclog.NewNullLogger() extractor := &ExtractorOracle{logger: logger, replicateDoDb: []*common.DataSource{}}