Skip to content

Commit

Permalink
Support unset operators (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
TPRobots authored and Egidijus Bartkus committed Nov 12, 2018
1 parent e581713 commit ac38a59
Showing 1 changed file with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
// There is a new primary, so stop using this server and instead use the new primary ...

// TRUSTPILOT QUICK FIX:
// - if false is returned executor stops working. It's only intended to be called on STOP/EXIT.
//return false;
// - if false is returned executor stops working, because it's only intended to be called on
// STOP/EXIT.
throw new MongoPrimaryChangedException(String.format(
"Found new primary event in oplog, so stopping use of {} to continue with new primary"
, primaryAddress));
Expand Down Expand Up @@ -474,14 +474,7 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
if (collectionFilter.test(collectionId)) {
RecordsForCollection factory = recordMakers.forCollection(collectionId);
try {
// if event is for $set operation, then switch the "o" with current document state and put $set value into serialized json field for reference
// this is solution for consumers, that cannot reconstruct state from events and relies on last transactions (head)
if (event.get("o", Document.class).containsKey("$set")) {
factory.recordEvent(this.enrichSetTransaction(event, dbName, collectionName), clock.currentTimeInMillis());
} else {
factory.recordEvent(event, clock.currentTimeInMillis());
}

recordEvent(event, dbName, collectionName, factory);
} catch (InterruptedException e) {
Thread.interrupted();
return false;
Expand All @@ -491,14 +484,45 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
return true;
}

// If event is one of the MongoDB's update operators:
// then extend the "o" property with current document state and put ${update_operator} value into serialized json field '_oplog_o' for reference.
// This is solution required to support consumers, that cannot reconstruct state from events and relies on last transactions (state).
private void recordEvent(Document event, String dbName, String collectionName, RecordsForCollection factory) throws InterruptedException {
Document eventToRecord = event;

if (eventIsMongoDbUpdateOperator(event)) {
eventToRecord = this.enrichEvent(event, dbName, collectionName);
}

factory.recordEvent(eventToRecord, clock.currentTimeInMillis());
}

// Full list of Mongo update operators: https://docs.mongodb.com/manual/reference/operator/update-field/
// Most of them translates to $set in oplog.
// Except for $unset and $rename. Both of them use additional $unset command.
private static ArrayList<String> mongoDbUpdateOperators = new ArrayList<>(2);
static {
mongoDbUpdateOperators.add("$set");
mongoDbUpdateOperators.add("$unset");
}

private boolean eventIsMongoDbUpdateOperator(Document event){
Document o = event.get("o", Document.class);

for (String key : mongoDbUpdateOperators) {
if (o.keySet().contains(key)) {
return true;
}
}
return false;
}

private Document enrichSetTransaction(Document event, String dbName, String collectionName) {
private Document enrichEvent(Document event, String dbName, String collectionName) {
// "disconnect" from Mongo driver in order to prevent large memory leak, that occurs putting large documents into ongoing event
Document e = Document.parse(event.toJson());

primaryClient.execute("get current doc", client -> {
Object id = event.get("o2", Document.class).get("_id");
Object id = e.get("o2", Document.class).get("_id");
BasicDBObject query = new BasicDBObject();
query.put("_id", id);

Expand All @@ -513,8 +537,15 @@ private Document enrichSetTransaction(Document event, String dbName, String coll
current.append("_id", id);
}

Document set = event.get("o", Document.class);
current.append("_set", set.get("$set", Document.class).toJson());
// Add update operator as metadata
Document updateOperator = e.get("o", Document.class);
current.append("_oplog_o", updateOperator.toJson());

// Backward compatibility
if (updateOperator.keySet().contains("$set")) {
current.append("_set", updateOperator.get("$set", Document.class).toJson());
}

e.put("o", current);
});

Expand Down

0 comments on commit ac38a59

Please sign in to comment.