Skip to content

Commit

Permalink
handle oplog event with unknow ops
Browse files Browse the repository at this point in the history
  • Loading branch information
raimondast committed Sep 24, 2020
1 parent f4458cf commit 0f196f7
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 52 deletions.
94 changes: 47 additions & 47 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
language: java
cache:
directories:
- $HOME/.m2/repository

sudo: required

jdk:
- oraclejdk8

services:
- docker

# First stop MySQL and PostgreSQL that run by default (see DBZ-163). Then check Docker status. Finally,
# install Maven 3.3.9, since the Jolokia Maven plugin requires 3.2.1 or later but Travis current runs 3.1.x
before_install:
- sudo service mysql-5.6 stop || sudo service mysql stop
- sudo /etc/init.d/postgresql stop
- docker -v
- docker ps -a
- wget https://archive.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
- unzip -qq apache-maven-3.3.9-bin.zip
- export M2_HOME=$PWD/apache-maven-3.3.9
- export PATH=$M2_HOME/bin:$PATH
- mvn -version

# By default Travis will install dependencies before it runs a build, and with Maven projects that
# means running an install build without tests. We don't need to do this, and it actually causes problems.
# So skip it...
install: true

after_install:
- docker ps -a

script:
- mvn clean install -Passembly -Dversion.postgres.server=9.6-devel
- mvn install -pl debezium-connector-mysql -Pparser-antlr -DskipTests
- mvn install -pl debezium-connector-postgres -Pwal2json-decoder -Dversion.postgres.server=9.6-devel
- mvn install -pl debezium-connector-mongodb -Dversion.mongo.server=3.2.19

notifications:
webhooks:
urls:
- https://webhooks.gitter.im/e/44c51b6da8c2bec32645
on_success: always # options: [always|never|change] default: always
on_failure: always # options: [always|never|change] default: always
on_start: never # options: [always|never|change] default: always
#language: java
#cache:
# directories:
# - $HOME/.m2/repository
#
#sudo: required
#
#jdk:
# - oraclejdk8
#
#services:
# - docker
#
## First stop MySQL and PostgreSQL that run by default (see DBZ-163). Then check Docker status. Finally,
## install Maven 3.3.9, since the Jolokia Maven plugin requires 3.2.1 or later but Travis current runs 3.1.x
#before_install:
# - sudo service mysql-5.6 stop || sudo service mysql stop
# - sudo /etc/init.d/postgresql stop
# - docker -v
# - docker ps -a
# - wget https://archive.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
# - unzip -qq apache-maven-3.3.9-bin.zip
# - export M2_HOME=$PWD/apache-maven-3.3.9
# - export PATH=$M2_HOME/bin:$PATH
# - mvn -version
#
## By default Travis will install dependencies before it runs a build, and with Maven projects that
## means running an install build without tests. We don't need to do this, and it actually causes problems.
## So skip it...
#install: true
#
#after_install:
# - docker ps -a
#
#script:
# - mvn clean install -Passembly -Dversion.postgres.server=9.6-devel
# - mvn install -pl debezium-connector-mysql -Pparser-antlr -DskipTests
# - mvn install -pl debezium-connector-postgres -Pwal2json-decoder -Dversion.postgres.server=9.6-devel
# - mvn install -pl debezium-connector-mongodb -Dversion.mongo.server=3.2.19
#
#notifications:
# webhooks:
# urls:
# - https://webhooks.gitter.im/e/44c51b6da8c2bec32645
# on_success: always # options: [always|never|change] default: always
# on_failure: always # options: [always|never|change] default: always
# on_start: never # options: [always|never|change] default: always
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.mongodb;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -41,11 +42,18 @@
public class RecordMakers {

private static final ObjectSerializer jsonSerializer = JSONSerializers.getStrict();
private static final Map<String, Operation> operationLiterals = new HashMap<>();

@ThreadSafe
private static final Map<String, Operation> OPERATION_LITERALS;

static {
operationLiterals.put("i", Operation.CREATE);
operationLiterals.put("u", Operation.UPDATE);
operationLiterals.put("d", Operation.DELETE);
Map<String, Operation> literals = new HashMap<>();

literals.put("i", Operation.CREATE);
literals.put("u", Operation.UPDATE);
literals.put("d", Operation.DELETE);

OPERATION_LITERALS = Collections.unmodifiableMap(literals);
}

private final Logger logger = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -188,7 +196,7 @@ public int recordEvent(Document oplogEvent, long timestamp) throws InterruptedEx
Object o2 = oplogEvent.get("o2");
String objId = o2 != null ? idObjToJson(o2) : idObjToJson(patchObj);
assert objId != null;
Operation operation = operationLiterals.get(oplogEvent.getString("op"));
Operation operation = OPERATION_LITERALS.get(oplogEvent.getString("op"));
return createRecords(sourceValue, offset, operation, objId, patchObj, timestamp);
}

Expand Down Expand Up @@ -255,4 +263,9 @@ public void clear() {
logger.debug("Clearing table converters");
recordMakerByCollectionId.clear();
}

public static boolean isValidOperation(String operation) {
return OPERATION_LITERALS.containsKey(operation);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,11 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
logger.warn("Missing 'o' field in event, so skipping {}", event.toJson());
return true;
}

if (!eventOpIsSupported(event)) {
return true;
}

if (ns == null || ns.isEmpty()) {
// These are replica set events ...
String msg = object.getString("msg");
Expand Down Expand Up @@ -524,6 +529,11 @@ private void recordEvent(Document event, String dbName, String collectionName, R
mongoDbUpdateOperators.add("$unset");
}

private boolean eventOpIsSupported(Document event) {
String op = event.getString("op");
return RecordMakers.isValidOperation(op);
}

private boolean eventIsMongoDbUpdateOperator(Document event){
Document o = event.get("o", Document.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//package io.debezium.connector.mongodb;
//
//import io.debezium.config.Configuration;
//import io.debezium.util.Testing;
//import org.apache.kafka.connect.source.SourceRecord;
//import org.junit.Before;
//import org.junit.Test;
//
//import java.util.LinkedList;
//import java.util.List;
//
//import static org.fest.assertions.Assertions.assertThat;
//
//
//public class TpReplicatorTests extends AbstractMongoIT {
//
// @Before
// public void beforeEach() {
// Testing.Print.disable();
// Testing.Debug.disable();
// this.useConfiguration(TpReplicatorTests.getConfiguration());
// }
//
// @Test
// public void shouldReturnVersion() throws InterruptedException {
// assertThat(Module.version()).isNotNull();
// assertThat(Module.version()).isNotEmpty();
//
//
// List<SourceRecord> records = new LinkedList<>();
// Replicator replicator = new Replicator(context, replicaSet, records::add, (x) -> {});
// Thread thread = new Thread(replicator::run);
// thread.start();
//
// Thread.sleep(Integer.MAX_VALUE);
// }
//
// public static Configuration getConfiguration() {
// return Configuration.fromSystemProperties("connector.").edit()
// .withDefault(MongoDbConnectorConfig.HOSTS, "set-545a4ec660e6d158ff007ced/trustpilot-m1.tp-staging.com:27017")
// .withDefault(MongoDbConnectorConfig.COLLECTION_WHITELIST, "Trustpilot.BusinessUnit")
// .withDefault(MongoDbConnectorConfig.PASSWORD, "******")
// .withDefault(MongoDbConnectorConfig.USER, "TP_OplogReader")
// .withDefault(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true)
// .withDefault(MongoDbConnectorConfig.LOGICAL_NAME, "mongo.tp").build();
// }
//
//}

0 comments on commit 0f196f7

Please sign in to comment.