From ac38a5993c77e39776f6ad3f41f0885c9b3ec4c7 Mon Sep 17 00:00:00 2001 From: Trustpilot Robot User Date: Mon, 12 Nov 2018 07:57:31 +0100 Subject: [PATCH] Support unset operators (#12) --- .../connector/mongodb/Replicator.java | 59 ++++++++++++++----- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index 2953852787b..5c3b4cad10c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -440,8 +440,8 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) // There is a new primary, so stop using this server and instead use the new primary ... // TRUSTPILOT QUICK FIX: - // - if false is returned executor stops working. It's only intended to be called on STOP/EXIT. - //return false; + // - if false is returned executor stops working, because it's only intended to be called on + // STOP/EXIT. throw new MongoPrimaryChangedException(String.format( "Found new primary event in oplog, so stopping use of {} to continue with new primary" , primaryAddress)); @@ -474,14 +474,7 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) if (collectionFilter.test(collectionId)) { RecordsForCollection factory = recordMakers.forCollection(collectionId); try { - // if event is for $set operation, then switch the "o" with current document state and put $set value into serialized json field for reference - // this is solution for consumers, that cannot reconstruct state from events and relies on last transactions (head) - if (event.get("o", Document.class).containsKey("$set")) { - factory.recordEvent(this.enrichSetTransaction(event, dbName, collectionName), clock.currentTimeInMillis()); - } else { - factory.recordEvent(event, clock.currentTimeInMillis()); - } - + recordEvent(event, dbName, collectionName, factory); } catch (InterruptedException e) { Thread.interrupted(); return false; @@ -491,14 +484,45 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event) return true; } + // If event is one of the MongoDB's update operators: + // then extend the "o" property with current document state and put ${update_operator} value into serialized json field '_oplog_o' for reference. + // This is solution required to support consumers, that cannot reconstruct state from events and relies on last transactions (state). + private void recordEvent(Document event, String dbName, String collectionName, RecordsForCollection factory) throws InterruptedException { + Document eventToRecord = event; + + if (eventIsMongoDbUpdateOperator(event)) { + eventToRecord = this.enrichEvent(event, dbName, collectionName); + } + + factory.recordEvent(eventToRecord, clock.currentTimeInMillis()); + } + + // Full list of Mongo update operators: https://docs.mongodb.com/manual/reference/operator/update-field/ + // Most of them translates to $set in oplog. + // Except for $unset and $rename. Both of them use additional $unset command. + private static ArrayList mongoDbUpdateOperators = new ArrayList<>(2); + static { + mongoDbUpdateOperators.add("$set"); + mongoDbUpdateOperators.add("$unset"); + } + + private boolean eventIsMongoDbUpdateOperator(Document event){ + Document o = event.get("o", Document.class); + for (String key : mongoDbUpdateOperators) { + if (o.keySet().contains(key)) { + return true; + } + } + return false; + } - private Document enrichSetTransaction(Document event, String dbName, String collectionName) { + private Document enrichEvent(Document event, String dbName, String collectionName) { // "disconnect" from Mongo driver in order to prevent large memory leak, that occurs putting large documents into ongoing event Document e = Document.parse(event.toJson()); primaryClient.execute("get current doc", client -> { - Object id = event.get("o2", Document.class).get("_id"); + Object id = e.get("o2", Document.class).get("_id"); BasicDBObject query = new BasicDBObject(); query.put("_id", id); @@ -513,8 +537,15 @@ private Document enrichSetTransaction(Document event, String dbName, String coll current.append("_id", id); } - Document set = event.get("o", Document.class); - current.append("_set", set.get("$set", Document.class).toJson()); + // Add update operator as metadata + Document updateOperator = e.get("o", Document.class); + current.append("_oplog_o", updateOperator.toJson()); + + // Backward compatibility + if (updateOperator.keySet().contains("$set")) { + current.append("_set", updateOperator.get("$set", Document.class).toJson()); + } + e.put("o", current); });