diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java index 00e5fe2127..baad141530 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java @@ -40,10 +40,10 @@ import java.util.Map; import java.util.Properties; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DATABASE_NAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OPERATIONS; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLES; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_OPERATIONS; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TABLES; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TS_MS; public class CDCInjectHandler { @@ -73,9 +73,7 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE ChangeEvent eventRecord = (ChangeEvent) object; if (eventRecord == null || eventRecord.value() == null) { - if (logger.isDebugEnabled()) { - logger.debug("CDC Source Handler received empty event record"); - } + logger.debug("CDC Source Handler received empty event record"); } else { InputStream in = null; try { @@ -87,10 +85,10 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName()); CDCEventOutput cdcEventOutput = new CDCEventOutput(eventRecord); - msgCtx.setProperty(DATABASE_NAME, cdcEventOutput.getDatabase()); - msgCtx.setProperty(TABLES, cdcEventOutput.getTable().toString()); - msgCtx.setProperty(OPERATIONS, cdcEventOutput.getOp()); - msgCtx.setProperty(TS_MS, cdcEventOutput.getTs_ms().toString()); + msgCtx.setProperty(CDC_DATABASE_NAME, cdcEventOutput.getDatabase()); + msgCtx.setProperty(CDC_TABLES, cdcEventOutput.getTable().toString()); + msgCtx.setProperty(CDC_OPERATIONS, cdcEventOutput.getOp()); + msgCtx.setProperty(CDC_TS_MS, cdcEventOutput.getTs_ms().toString()); if (logger.isDebugEnabled()) { logger.debug("Processed event : " + eventRecord); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java index 3e901f8bcd..c10698dd6f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java @@ -99,14 +99,9 @@ public void execute() { * according to the registered handler */ public ChangeEvent poll() { - - if (logger.isDebugEnabled()) { - logger.debug("Start : listening to DB events : "); - } + logger.debug("Start : listening to DB events : "); listenDataChanges(); - if (logger.isDebugEnabled()) { - logger.debug("End : Listening to DB events : "); - } + logger.debug("End : Listening to DB events : "); return null; } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java index f5dcea45c9..41479d471e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java @@ -18,9 +18,6 @@ package org.wso2.carbon.inbound.endpoint.protocol.cdc; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +36,15 @@ import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_ALLOWED_OPERATIONS; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PASSWORD; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER; @@ -55,6 +57,7 @@ import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TOPIC_PREFIX; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SKIPPED_OPERATIONS; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TRUE; public class CDCProcessor extends InboundRequestProcessorImpl implements TaskStartupObserver, InboundTaskProcessor { @@ -72,6 +75,9 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory"; private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class); + private enum operations {create, update, delete, truncate}; + private enum opCodes {c, u, d, t}; + public CDCProcessor(InboundProcessorParams params) { this.name = params.getName(); this.injectingSeq = params.getInjectingSeq(); @@ -128,6 +134,11 @@ private void setProperties () { this.cdcProperties.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE); } + if (this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS) != null) { + this.cdcProperties.setProperty(DEBEZIUM_SKIPPED_OPERATIONS, + getSkippedOperationsString(this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS))); + } + if (this.cdcProperties.getProperty(DEBEZIUM_TOPIC_PREFIX) == null) { this.cdcProperties.setProperty(DEBEZIUM_TOPIC_PREFIX, this.name +"_topic"); } @@ -233,4 +244,34 @@ public void destroy(boolean removeTask) { public void update() { // This will not be called for inbound endpoints } + + private String getOpCode(String op) { + if (op != null) { + switch (operations.valueOf(op)) { + case create: + return opCodes.c.toString(); + case update: + return opCodes.u.toString(); + case delete: + return opCodes.d.toString(); + case truncate: + return opCodes.t.toString(); + } + } + return ""; + } + + /** + * Get the comma separated list containing allowed operations and returns the string of skipped operation codes + * @param allowedOperationsString string + * @return the coma separated string of skipped operation codes + */ + private String getSkippedOperationsString(String allowedOperationsString) { + List allOperations = Stream.of(opCodes.values()).map(Enum :: toString).collect(Collectors.toList()); + Set allowedOperationsSet = Stream.of(allowedOperationsString.split(",")). + map(String :: trim).map(String :: toLowerCase).map(op -> getOpCode(op)). + collect(Collectors.toSet()); + allOperations.removeAll(allowedOperationsSet); + return String.join(",", allOperations); + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java index 4795fab44a..a81d0b6fa9 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java @@ -42,9 +42,7 @@ public CDCTask(CDCPollingConsumer pollingConsumer, long interval) { } protected void taskExecute() { - if (logger.isDebugEnabled()) { - logger.debug("CDC Task executing."); - } + logger.debug("CDC Task executing."); pollingConsumer.execute(); } @@ -54,14 +52,10 @@ public Properties getInboundProperties() { } public void init(SynapseEnvironment synapseEnvironment) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing Task."); - } + logger.debug("Initializing Task."); } public void destroy() { - if (logger.isDebugEnabled()) { - logger.debug("Destroying Task. "); - } + logger.debug("Destroying Task. "); } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java index c23931d9b2..386cb7bae7 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java @@ -36,11 +36,15 @@ class InboundCDCConstants { public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL = "schema.history.internal"; public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME = "schema.history.internal.file.filename"; + public static final String DEBEZIUM_SKIPPED_OPERATIONS = "skipped.operations"; + public static final String DEBEZIUM_ALLOWED_OPERATIONS = "allowed.operations"; + /** Output Properties **/ - public static final String DATABASE_NAME = "database"; - public static final String TABLES ="tables"; - public static final String OPERATIONS ="operations"; + public static final String CDC_DATABASE_NAME = "cdc.database"; + public static final String CDC_TABLES ="cdc.tables"; + public static final String CDC_OPERATIONS ="cdc.operations"; + public static final String CDC_TS_MS = "cdc.ts_ms"; public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before";