Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions driver/oracle/extractor/log_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,10 @@ func (e *ExtractorOracle) DataStreamEvents(entriesChannel chan<- *common.EntryCo

func (e *ExtractorOracle) handleSQLs(tx *LogMinerTx) *common.DataEntry {
entry := common.NewBinlogEntry()
entry.Final = true
entry.Final = true
oracleCoordinateTx := &common.OracleCoordinateTx{
OldestUncommittedScn: tx.oldestUncommittedScn,
EndSCN: tx.endScn,
OldestUncommittedScn: tx.oldestUncommittedScn,
EndSCN: tx.endScn,
}
entry.Coordinates = oracleCoordinateTx
for _, row := range tx.records {
Expand Down Expand Up @@ -925,9 +925,8 @@ func (e *ExtractorOracle) parseToDataEvent(row *LogMinerRecord) (common.DataEven
return dataEvent, fmt.Errorf("parese dateEvent fail , operation Code %v", row.Operation)
}
func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (dataEvent common.DataEvent, err error) {
// e.logger.Debug("============= dml stmt parse start===============", "redoSQL", redoSQL, "undoSQL", undoSQL)
// todo 大小写问题
OracleToMySQL := func(oracleSQL string) string {
// Convert oracle SQL to MySQL format that can be parsed by tidb parese
OracleToMySQLFormat := func(oracleSQL string) string {
if strings.HasPrefix(oracleSQL, "insert into") {
insertSqlSlice := strings.Split(oracleSQL, ") values (")
if len(insertSqlSlice) == 2 {
Expand All @@ -941,34 +940,34 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data
delSqlSlice := strings.Split(oracleSQL, " where ")
if len(delSqlSlice) == 2 {
oracleSQL = strings.Replace(oracleSQL, delSqlSlice[0], ReplaceSpecifiedString(delSqlSlice[0], `"`, "`"), 1)
wereExper := delSqlSlice[1]
wereExperSli := strings.Split(wereExper, " and ")
for i := range wereExperSli {
colAndVal := strings.Split(wereExperSli[i], " = ")
whereExper := delSqlSlice[1]
whereExperSli := strings.Split(whereExper, " and ")
for i := range whereExperSli {
colAndVal := strings.Split(whereExperSli[i], " = ")
if len(colAndVal) == 2 {
wereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"),
whereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"),
ReplaceSpecifiedString(colAndVal[1], `\`, `\\`))
}
}
wereExper = strings.Join(wereExperSli, " and ")
oracleSQL = strings.Replace(oracleSQL, delSqlSlice[1], wereExper, 1)
whereExper = strings.Join(whereExperSli, " and ")
oracleSQL = strings.Replace(oracleSQL, delSqlSlice[1], whereExper, 1)
}
} else if strings.HasPrefix(oracleSQL, "update") {
// update "TEST"."BINARY_FLOAT6" set "COL2" ='500' where "COL1" = '3' and "COL2" = 'NULL';
updateSqlSlice := strings.Split(oracleSQL, " where ")
if len(updateSqlSlice) == 2 {
// "COL1" = '3' and "COL2" = 'NULL';
wereExper := updateSqlSlice[1]
wereExperSli := strings.Split(wereExper, " and ")
for i := range wereExperSli {
colAndVal := strings.Split(wereExperSli[i], " = ")
whereExper := updateSqlSlice[1]
whereExperSli := strings.Split(whereExper, " and ")
for i := range whereExperSli {
colAndVal := strings.Split(whereExperSli[i], " = ")
if len(colAndVal) == 2 {
wereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"),
whereExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"),
ReplaceSpecifiedString(colAndVal[1], `\`, `\\`))
}
}
wereExper = strings.Join(wereExperSli, " and ")
updateSqlSlice[1] = wereExper
whereExper = strings.Join(whereExperSli, " and ")
updateSqlSlice[1] = whereExper

// update "TEST"."BINARY_FLOAT6" set "COL2" ='500' and "COL1" = 'ss'
headerExper := updateSqlSlice[0]
Expand All @@ -979,7 +978,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data
// "COL2" ='500' and "COL1" = 'ss'
setExperSli := strings.Split(headExperSli[1], " and ")
for i := range setExperSli {
colAndVal := strings.Split(wereExperSli[i], " = ")
colAndVal := strings.Split(setExperSli[i], " = ")
setExperSli[i] = fmt.Sprintf("%s = %s", ReplaceSpecifiedString(colAndVal[0], `"`, "`"),
ReplaceSpecifiedString(colAndVal[1], `\`, `\\`))
}
Expand All @@ -999,7 +998,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data
}
return oracleSQL
}
redoSQL := OracleToMySQL(oracleRedoSQL)
redoSQL := OracleToMySQLFormat(oracleRedoSQL)
p := parser.New()
stmt, _, err := p.Parse(redoSQL, "", "")
if err != nil {
Expand Down Expand Up @@ -1046,7 +1045,7 @@ func (e *ExtractorOracle) parseDMLSQL(oracleRedoSQL, oracleUndoSQL string) (data
case common.DeleteDML:
dataEvent.Rows = [][]interface{}{visitor.WhereColumnValues}
case common.UpdateDML:
undoSQL := OracleToMySQL(oracleUndoSQL)
undoSQL := OracleToMySQLFormat(oracleUndoSQL)
undoP := parser.New()
untoStmt, _, err := undoP.Parse(undoSQL, "", "")
if err != nil {
Expand Down
60 changes: 9 additions & 51 deletions driver/oracle/extractor/log_miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,19 @@ func TestParseDMLSQL(t *testing.T) {
undo_sql string
want_rows [][]interface{}
}{
// update SQL
{
name: "TESTNULL",
sql: "update \"TEST\".\"TESTNULL\" set \"COL1\" = NULL where \"COL1\" = 'T'",
undo_sql: "update \"TEST\".\"TESTNULL\" set \"COL1\" = 'T' where \"COL1\" IS NULL",
want_rows: [][]interface{}{{"T", nil}, {nil, nil}},
},
// Insert SQL
{
name: "NCHAR_255_COLUMNS",
sql: `insert into "TEST"."NCHAR_255_COLUMNS"("COL1","COL2") values ('11',UNISTR('\6570\636E\5E93sql\6D4B\8BD5\6570\636E\5E93sql\6D4B\8BD5 '))`,
undo_sql: ``,
want_rows: [][]interface{}{{"11", "数据库sql测试数据库sql测试 "}}},
// NUMBER(*)
// BFILE no support
// BINARY_FLOAT
// insert into TEST.TEST("COL1","COL2") values (1, 3.40282E+38F);
// insert into TEST.TEST("COL1","COL2") values (2, BINARY_FLOAT_INFINITY);
// insert into TEST.TEST("COL1","COL2") values (3, -BINARY_FLOAT_INFINITY);
// insert into TEST.TEST("COL1","COL2") values (4, BINARY_FLOAT_NAN);
{
name: "BINARY_FLOAT1",
sql: `insert into "TEST"."BINARY_FLOAT1"("COL1","COL2") values ('0', '1.17549E-38F');`,
Expand Down Expand Up @@ -188,7 +189,7 @@ func TestParseDMLSQL(t *testing.T) {
},
{
name: "BINARY_FLOAT6",
sql: `update "TEST"."BINARY_FLOAT6" set "COL2" ='500' where "COL1" = '3' and "COL2" = 'NULL';`,
sql: `update "TEST"."BINARY_FLOAT6" set "COL2" = '500' where "COL1" = '3' and "COL2" = 'NULL';`,
undo_sql: `update "TEST"."BINARY_FLOAT6" set "COL2" = NULL where "COL1" = '3' and "COL2" = '50\0';`,
want_rows: [][]interface{}{{"3", nil}, {"3", "50\\0"}},
},
Expand All @@ -198,10 +199,6 @@ func TestParseDMLSQL(t *testing.T) {
undo_sql: `insert into "TEST"."BINARY_FLOAT7"("COL1","COL2") VALUES ('4', 'Nan');`,
want_rows: [][]interface{}{{"4", nil}},
},
// insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('1',NULL)
// insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('2',TO_DATE('-4712-01-01 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'))
// insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('3',TO_DATE(' 9999-12-31 00:00:00', 'SYYYY-MM-DD HH24:MI:SS'))
// insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('4',TO_DATE(' 2003-05-03 21:02:44', 'SYYYY-MM-DD HH24:MI:SS'))
{
name: "DATE_COLUMNS",
sql: `insert into "TEST"."DATE_COLUMNS"("COL1","COL2") values ('1',NULL)`,
Expand All @@ -226,7 +223,6 @@ func TestParseDMLSQL(t *testing.T) {
undo_sql: ``,
want_rows: [][]interface{}{{"4", " 2003-05-03 21:02:44"}},
},
// CREATE TABLE TEST."te\shu"(COL1 INT, col2 CHAR(256))
{
name: `te\shu`,
sql: `insert into "TEST"."te\shu"("COL1","COL2") values ('5','x\x44')`,
Expand All @@ -245,11 +241,6 @@ func TestParseDMLSQL(t *testing.T) {
undo_sql: `insert into "TEST"."BINARY_FLOAT"("COL1","COL2") VALUES ('5', 'Nan');`,
want_rows: [][]interface{}{{"5", `"`}},
},
// {
// name: "NCHAR_255_COLUMNS",
// sql: `insert into "TEST"."NCHAR_255_COLUMNS"("COL1","COL2") values ('9',UNISTR('\6570\636E\5E93sql\6D4B\8BD5'))`,
// undo_sql: ``,
// want_rows: [][]interface{}{{"9", "数据库sql测试"}}},
{
name: "CHAR_255_COLUMNS2",
sql: `insert into "TEST"."CHAR_255_COLUMNS2"("COL1","COL2") values ('16','"')`,
Expand Down Expand Up @@ -521,34 +512,6 @@ func TestParseDDLSQL(t *testing.T) {
name: "createTableSQLCharRelation",
sql: `CREATE TABLE TEST.XMLTYPE_COLUMNS(ID INT, C_XMLTYPE XMLTYPE);`,
want: "CREATE TABLE `TEST`.`XMLTYPE_COLUMNS` (`ID` INT,`C_XMLTYPE` LONGTEXT) DEFAULT CHARACTER SET = UTF8MB4"},

// {
// name: "createTableSQLCharRelation",
// sql: `CREATE TABLE test."persons"(
// "first_name" VARCHAR(15) NOT NULL,
// last_name VARCHAR2(45) NOT NULL
// );`,
// want: "CREATE TABLE `TEST`.`persons (first_name VARCHAR(15),LAST_NAME VARCHAR(45)) DEFAULT CHARACTER SET = UTF8MB4"},
// {
// // 是否支持没有 p s,或者仅仅支持 p s
// //NUMERIC_NAME NUMERIC(15,2),
// //Decimal_NAME DECIMAL(15,2),
// //Dec_NAME DEC(15,2),
// // ps 都为空时候,oracle上限为 38,0 mysql上限为10,0
// name: "createTableSQLNumberRelation",
// sql: `CREATE TABLE test."persons"(
// "first_num" NUMBER(15,2) NOT NULL,
// second_num NUMBER(10) NOT NULL,
// three_num NUMBER(5,0) NOT NULL,
// last_name NUMBER NOT NULL,
// NUMERIC_NAME NUMERIC(15,2),
// Decimal_NAME DECIMAL(15,2),
// Dec_NAME DEC(15,2),
// INTEGER_NAME INTEGER,
// INT_NAME INT,
// SMALLINT_NAME SMALLINT
// );`,
// want: "CREATE TABLE `TEST`.`persons (first_num DECIMAL(15,2),SECOND_NUM BIGINT,THREE_NUM INT,LAST_NAME DOUBLE,NUMERIC_NAME NUMERIC(15,2),DECIMAL_NAME DECIMAL(15,2),DEC_NAME DEC(15,2),INTEGER_NAME INT,INT_NAME INT,SMALLINT_NAME DECIMAL(38)) DEFAULT CHARACTER SET = UTF8MB4"},
}
logger := hclog.NewNullLogger()
extractor := &ExtractorOracle{logger: logger, replicateDoDb: []*common.DataSource{}}
Expand Down Expand Up @@ -736,11 +699,6 @@ func TestParseConstraintSQL(t *testing.T) {
)`,
want: "CREATE TABLE `TEST`.`EMPLOYEES_DEMO` (`EMPLOYEE_ID` INT,`FIRST_NAME` VARCHAR(20),`LAST_NAME` VARCHAR(25) NOT NULL,`EMAIL` VARCHAR(25) NOT NULL,`PHONE_NUMBER` VARCHAR(20),`HIRE_DATE` DATETIME NOT NULL,`JOB_ID` VARCHAR(10) NOT NULL,`SALARY` DECIMAL(8,2) NOT NULL,`COMMISSION_PCT` DECIMAL(2,2),`MANAGER_ID` INT,`DEPARTMENT_ID` SMALLINT,`DN` VARCHAR(300),UNIQUE `EMP_EMAIL_UK_DEMO`(`email`)) DEFAULT CHARACTER SET = UTF8MB4",
},
// {
// name: "createOutOfLineConstraint",
// sql: `CREATE TABLE TEST.emp1 ( id number REFERENCES TEST.USERINFO11 ( ID ), NAME VARCHAR ( 8 ) );`,
// want: "CREATE TABLE `TEST`.`EMP1` (`ID` DOUBLE,`NAME` VARCHAR(8)) DEFAULT CHARACTER SET = UTF8MB4",
// },
}
logger := hclog.NewNullLogger()
extractor := &ExtractorOracle{logger: logger, replicateDoDb: []*common.DataSource{}}
Expand Down