From 0f196f71cb48a6cbf4d14eeb208fbcbdf1948323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raimondas=20Tij=C5=ABnaitis?= Date: Thu, 24 Sep 2020 17:08:21 +0300 Subject: [PATCH] handle oplog event with unknow ops --- .travis.yml | 94 +++++++++---------- .../connector/mongodb/RecordMakers.java | 23 ++++- .../connector/mongodb/Replicator.java | 10 ++ .../connector/mongodb/TpReplicatorTests.java | 48 ++++++++++ 4 files changed, 123 insertions(+), 52 deletions(-) create mode 100644 debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TpReplicatorTests.java diff --git a/.travis.yml b/.travis.yml index 55293d31f77..07d79fbf9c4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,47 +1,47 @@ -language: java -cache: - directories: - - $HOME/.m2/repository - -sudo: required - -jdk: - - oraclejdk8 - -services: - - docker - -# First stop MySQL and PostgreSQL that run by default (see DBZ-163). Then check Docker status. Finally, -# install Maven 3.3.9, since the Jolokia Maven plugin requires 3.2.1 or later but Travis current runs 3.1.x -before_install: - - sudo service mysql-5.6 stop || sudo service mysql stop - - sudo /etc/init.d/postgresql stop - - docker -v - - docker ps -a - - wget https://archive.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip - - unzip -qq apache-maven-3.3.9-bin.zip - - export M2_HOME=$PWD/apache-maven-3.3.9 - - export PATH=$M2_HOME/bin:$PATH - - mvn -version - -# By default Travis will install dependencies before it runs a build, and with Maven projects that -# means running an install build without tests. We don't need to do this, and it actually causes problems. -# So skip it... -install: true - -after_install: - - docker ps -a - -script: - - mvn clean install -Passembly -Dversion.postgres.server=9.6-devel - - mvn install -pl debezium-connector-mysql -Pparser-antlr -DskipTests - - mvn install -pl debezium-connector-postgres -Pwal2json-decoder -Dversion.postgres.server=9.6-devel - - mvn install -pl debezium-connector-mongodb -Dversion.mongo.server=3.2.19 - -notifications: - webhooks: - urls: - - https://webhooks.gitter.im/e/44c51b6da8c2bec32645 - on_success: always # options: [always|never|change] default: always - on_failure: always # options: [always|never|change] default: always - on_start: never # options: [always|never|change] default: always +#language: java +#cache: +# directories: +# - $HOME/.m2/repository +# +#sudo: required +# +#jdk: +# - oraclejdk8 +# +#services: +# - docker +# +## First stop MySQL and PostgreSQL that run by default (see DBZ-163). Then check Docker status. Finally, +## install Maven 3.3.9, since the Jolokia Maven plugin requires 3.2.1 or later but Travis current runs 3.1.x +#before_install: +# - sudo service mysql-5.6 stop || sudo service mysql stop +# - sudo /etc/init.d/postgresql stop +# - docker -v +# - docker ps -a +# - wget https://archive.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip +# - unzip -qq apache-maven-3.3.9-bin.zip +# - export M2_HOME=$PWD/apache-maven-3.3.9 +# - export PATH=$M2_HOME/bin:$PATH +# - mvn -version +# +## By default Travis will install dependencies before it runs a build, and with Maven projects that +## means running an install build without tests. We don't need to do this, and it actually causes problems. +## So skip it... +#install: true +# +#after_install: +# - docker ps -a +# +#script: +# - mvn clean install -Passembly -Dversion.postgres.server=9.6-devel +# - mvn install -pl debezium-connector-mysql -Pparser-antlr -DskipTests +# - mvn install -pl debezium-connector-postgres -Pwal2json-decoder -Dversion.postgres.server=9.6-devel +# - mvn install -pl debezium-connector-mongodb -Dversion.mongo.server=3.2.19 +# +#notifications: +# webhooks: +# urls: +# - https://webhooks.gitter.im/e/44c51b6da8c2bec32645 +# on_success: always # options: [always|never|change] default: always +# on_failure: always # options: [always|never|change] default: always +# on_start: never # options: [always|never|change] default: always diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java index cf05e24cfd8..b8b722d151f 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mongodb; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -41,11 +42,18 @@ public class RecordMakers { private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict(); - private static final Map operationLiterals = new HashMap<>(); + + @ThreadSafe + private static final Map OPERATION_LITERALS; + static { - operationLiterals.put("i", Operation.CREATE); - operationLiterals.put("u", Operation.UPDATE); - operationLiterals.put("d", Operation.DELETE); + Map literals = new HashMap<>(); + + literals.put("i", Operation.CREATE); + literals.put("u", Operation.UPDATE); + literals.put("d", Operation.DELETE); + + OPERATION_LITERALS = Collections.unmodifiableMap(literals); } private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -188,7 +196,7 @@ public int recordEvent(Document oplogEvent, long timestamp) throws InterruptedEx Object o2 = oplogEvent.get("o2"); String objId = o2 != null ? idObjToJson(o2) : idObjToJson(patchObj); assert objId != null; - Operation operation = operationLiterals.get(oplogEvent.getString("op")); + Operation operation = OPERATION_LITERALS.get(oplogEvent.getString("op")); return createRecords(sourceValue, offset, operation, objId, patchObj, timestamp); } @@ -255,4 +263,9 @@ public void clear() { logger.debug("Clearing table converters"); recordMakerByCollectionId.clear(); } + + public static boolean isValidOperation(String operation) { + return OPERATION_LITERALS.containsKey(operation); + } + } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index 699d941d7f3..6f6af73796b 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -437,6 +437,11 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) logger.warn("Missing 'o' field in event, so skipping {}", event.toJson()); return true; } + + if (!eventOpIsSupported(event)) { + return true; + } + if (ns == null || ns.isEmpty()) { // These are replica set events ... String msg = object.getString("msg"); @@ -524,6 +529,11 @@ private void recordEvent(Document event, String dbName, String collectionName, R mongoDbUpdateOperators.add("$unset"); } + private boolean eventOpIsSupported(Document event) { + String op = event.getString("op"); + return RecordMakers.isValidOperation(op); + } + private boolean eventIsMongoDbUpdateOperator(Document event){ Document o = event.get("o", Document.class); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TpReplicatorTests.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TpReplicatorTests.java new file mode 100644 index 00000000000..ba1d99ed639 --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TpReplicatorTests.java @@ -0,0 +1,48 @@ +//package io.debezium.connector.mongodb; +// +//import io.debezium.config.Configuration; +//import io.debezium.util.Testing; +//import org.apache.kafka.connect.source.SourceRecord; +//import org.junit.Before; +//import org.junit.Test; +// +//import java.util.LinkedList; +//import java.util.List; +// +//import static org.fest.assertions.Assertions.assertThat; +// +// +//public class TpReplicatorTests extends AbstractMongoIT { +// +// @Before +// public void beforeEach() { +// Testing.Print.disable(); +// Testing.Debug.disable(); +// this.useConfiguration(TpReplicatorTests.getConfiguration()); +// } +// +// @Test +// public void shouldReturnVersion() throws InterruptedException { +// assertThat(Module.version()).isNotNull(); +// assertThat(Module.version()).isNotEmpty(); +// +// +// List records = new LinkedList<>(); +// Replicator replicator = new Replicator(context, replicaSet, records::add, (x) -> {}); +// Thread thread = new Thread(replicator::run); +// thread.start(); +// +// Thread.sleep(Integer.MAX_VALUE); +// } +// +// public static Configuration getConfiguration() { +// return Configuration.fromSystemProperties("connector.").edit() +// .withDefault(MongoDbConnectorConfig.HOSTS, "set-545a4ec660e6d158ff007ced/trustpilot-m1.tp-staging.com:27017") +// .withDefault(MongoDbConnectorConfig.COLLECTION_WHITELIST, "Trustpilot.BusinessUnit") +// .withDefault(MongoDbConnectorConfig.PASSWORD, "******") +// .withDefault(MongoDbConnectorConfig.USER, "TP_OplogReader") +// .withDefault(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true) +// .withDefault(MongoDbConnectorConfig.LOGICAL_NAME, "mongo.tp").build(); +// } +// +//}