Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

根据表名表达式匹配来过滤相应的DML操作 #4537

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递
- [阿里巴巴数据库连接池开源项目 Druid](https://github.com/alibaba/druid)
- [阿里巴巴实时数据同步工具 DTS](https://www.aliyun.com/product/dts)

## 问题反馈
## 问题反馈
- 报告 issue: [github issues](https://github.com/alibaba/canal/issues)
3 changes: 0 additions & 3 deletions deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
Expand Down
6 changes: 6 additions & 0 deletions deployer/src/main/resources/example/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ canal.instance.filter.black.regex=mysql\\.slave_.*
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# table regex for insert-dml
canal.instance.filter.dml.insert.regex =
# table regex for update-dml
canal.instance.filter.dml.update.regex =
# table regex for delete-dml
canal.instance.filter.dml.delete.regex =

# mq config
canal.mq.topic=example
Expand Down
21 changes: 18 additions & 3 deletions deployer/src/main/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,24 @@
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
<property name="dmlInsertTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.insert.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlUpdateTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.update.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlDeleteTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.delete.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
Expand Down
21 changes: 18 additions & 3 deletions deployer/src/main/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,24 @@
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
<property name="dmlInsertTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.insert.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlUpdateTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.update.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlDeleteTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.delete.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
Expand Down
21 changes: 18 additions & 3 deletions deployer/src/main/resources/spring/group-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,24 @@
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
<property name="dmlInsertTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.insert.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlUpdateTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.update.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlDeleteTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.delete.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
Expand Down
21 changes: 18 additions & 3 deletions deployer/src/main/resources/spring/memory-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,24 @@
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterDmlInsert" value="${canal.instance.filter.dml.insert:false}" />
<property name="filterDmlUpdate" value="${canal.instance.filter.dml.update:false}" />
<property name="filterDmlDelete" value="${canal.instance.filter.dml.delete:false}" />
<property name="dmlInsertTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.insert.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlUpdateTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.update.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="dmlDeleteTableFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.dml.delete.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;

protected boolean filterDmlInsert = false;
protected boolean filterDmlUpdate = false;
protected boolean filterDmlDelete = false;
protected CanalEventFilter dmlInsertTableFilter = null;
protected CanalEventFilter dmlUpdateTableFilter = null;
protected CanalEventFilter dmlDeleteTableFilter = null;
// instance received binlog bytes
protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L);
private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L);
Expand Down Expand Up @@ -178,7 +178,10 @@ protected MultiStageCoprocessor buildMultiStageCoprocessor() {
parallelThreadSize,
(LogEventConvert) binlogParser,
transactionBuffer,
destination, filterDmlInsert, filterDmlUpdate, filterDmlDelete);
destination,
(AviaterRegexFilter)dmlInsertTableFilter,
(AviaterRegexFilter)dmlUpdateTableFilter,
(AviaterRegexFilter)dmlDeleteTableFilter);
mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
return mysqlMultiStageCoprocessor;
}
Expand Down Expand Up @@ -229,28 +232,15 @@ public void setUseDruidDdlFilter(boolean useDruidDdlFilter) {
this.useDruidDdlFilter = useDruidDdlFilter;
}

public boolean isFilterDmlInsert() {
return filterDmlInsert;
public void setDmlInsertTableFilter(CanalEventFilter dmlInsertTableFilter) {
this.dmlInsertTableFilter = dmlInsertTableFilter;
}

public void setFilterDmlInsert(boolean filterDmlInsert) {
this.filterDmlInsert = filterDmlInsert;
}

public boolean isFilterDmlUpdate() {
return filterDmlUpdate;
}

public void setFilterDmlUpdate(boolean filterDmlUpdate) {
this.filterDmlUpdate = filterDmlUpdate;
}

public boolean isFilterDmlDelete() {
return filterDmlDelete;
public void setDmlUpdateTableFilter(CanalEventFilter dmlUpdateTableFilter) {
this.dmlUpdateTableFilter = dmlUpdateTableFilter;
}

public void setFilterDmlDelete(boolean filterDmlDelete) {
this.filterDmlDelete = filterDmlDelete;
public void setDmlDeleteTableFilter(CanalEventFilter dmlDeleteTableFilter) {
this.dmlDeleteTableFilter = dmlDeleteTableFilter;
}

public void setEnableTsdb(boolean enableTsdb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;

/**
* 针对解析器提供一个多阶段协同的处理
Expand Down Expand Up @@ -71,21 +73,23 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
private BatchEventProcessor<MessageEvent> simpleParserStage;
private BatchEventProcessor<MessageEvent> sinkStoreStage;
private LogContext logContext;
protected boolean filterDmlInsert = false;
protected boolean filterDmlUpdate = false;
protected boolean filterDmlDelete = false;
protected volatile AviaterRegexFilter dmlInsertTableNameFilter = null;
protected volatile AviaterRegexFilter dmlUpdateTableNameFilter = null;
protected volatile AviaterRegexFilter dmlDeleteTableNameFilter = null;

public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
EventTransactionBuffer transactionBuffer, String destination,
boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
AviaterRegexFilter dmlInsertTableNameFilter,
AviaterRegexFilter dmlUpdateTableNameFilter,
AviaterRegexFilter dmlDeleteTableNameFilter){
this.ringBufferSize = ringBufferSize;
this.parserThreadCount = parserThreadCount;
this.logEventConvert = logEventConvert;
this.transactionBuffer = transactionBuffer;
this.destination = destination;
this.filterDmlInsert = filterDmlInsert;
this.filterDmlUpdate = filterDmlUpdate;
this.filterDmlDelete = filterDmlDelete;
this.dmlInsertTableNameFilter = dmlInsertTableNameFilter;
this.dmlUpdateTableNameFilter = dmlUpdateTableNameFilter;
this.dmlDeleteTableNameFilter = dmlDeleteTableNameFilter;
}

@Override
Expand Down Expand Up @@ -261,6 +265,14 @@ public SimpleParserStage(LogContext context){
}
}

private Boolean isDmlFilterTable(AviaterRegexFilter nameFilter, RowsLogEvent event){
if (event.getTable() == null) {
// tableId对应的记录不存在
throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
}
return nameFilter.filter(event.getTable().getDbName() + "." + event.getTable().getTableName());
}

public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
try {
LogEvent logEvent = event.getEvent();
Expand All @@ -276,22 +288,22 @@ public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throw
switch (eventType) {
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
if (!filterDmlInsert) {
if (!isDmlFilterTable(dmlInsertTableNameFilter, (WriteRowsLogEvent) logEvent)) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
needDmlParse = true;
}
break;
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
case LogEvent.UPDATE_ROWS_EVENT:
if (!filterDmlUpdate) {
if (!isDmlFilterTable(dmlUpdateTableNameFilter, (UpdateRowsLogEvent) logEvent)) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
needDmlParse = true;
}
break;
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
if (!filterDmlDelete) {
if (!isDmlFilterTable(dmlDeleteTableNameFilter, (DeleteRowsLogEvent) logEvent)) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
needDmlParse = true;
}
Expand Down