Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trustpilot support unset operators #12

Merged
merged 4 commits into from
Nov 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
// There is a new primary, so stop using this server and instead use the new primary ...

// TRUSTPILOT QUICK FIX:
// - if false is returned executor stops working. It's only intended to be called on STOP/EXIT.
//return false;
// - if false is returned executor stops working, because it's only intended to be called on
// STOP/EXIT.
throw new MongoPrimaryChangedException(String.format(
"Found new primary event in oplog, so stopping use of {} to continue with new primary"
, primaryAddress));
Expand Down Expand Up @@ -474,14 +474,7 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
if (collectionFilter.test(collectionId)) {
RecordsForCollection factory = recordMakers.forCollection(collectionId);
try {
// if event is for $set operation, then switch the "o" with current document state and put $set value into serialized json field for reference
// this is solution for consumers, that cannot reconstruct state from events and relies on last transactions (head)
if (event.get("o", Document.class).containsKey("$set")) {
factory.recordEvent(this.enrichSetTransaction(event, dbName, collectionName), clock.currentTimeInMillis());
} else {
factory.recordEvent(event, clock.currentTimeInMillis());
}

recordEvent(event, dbName, collectionName, factory);
} catch (InterruptedException e) {
Thread.interrupted();
return false;
Expand All @@ -491,14 +484,45 @@ protected boolean handleOplogEvent(ServerAddress primaryAddress, Document event)
return true;
}

// If event is one of the MongoDB's update operators:
// then extend the "o" property with current document state and put ${update_operator} value into serialized json field '_oplog_o' for reference.
// This is solution required to support consumers, that cannot reconstruct state from events and relies on last transactions (state).
private void recordEvent(Document event, String dbName, String collectionName, RecordsForCollection factory) throws InterruptedException {
Document eventToRecord = event;

if (eventIsMongoDbUpdateOperator(event)) {
eventToRecord = this.enrichEvent(event, dbName, collectionName);
}

factory.recordEvent(eventToRecord, clock.currentTimeInMillis());
}

// Full list of Mongo update operators: https://docs.mongodb.com/manual/reference/operator/update-field/
// Most of them translates to $set in oplog.
// Except for $unset and $rename. Both of them use additional $unset command.
private static ArrayList<String> mongoDbUpdateOperators = new ArrayList<>(2);
static {
mongoDbUpdateOperators.add("$set");
mongoDbUpdateOperators.add("$unset");
}

private boolean eventIsMongoDbUpdateOperator(Document event){
Document o = event.get("o", Document.class);

for (String key : mongoDbUpdateOperators) {
if (o.keySet().contains(key)) {
return true;
}
}
return false;
}

private Document enrichSetTransaction(Document event, String dbName, String collectionName) {
private Document enrichEvent(Document event, String dbName, String collectionName) {
// "disconnect" from Mongo driver in order to prevent large memory leak, that occurs putting large documents into ongoing event
Document e = Document.parse(event.toJson());

primaryClient.execute("get current doc", client -> {
Object id = event.get("o2", Document.class).get("_id");
Object id = e.get("o2", Document.class).get("_id");
BasicDBObject query = new BasicDBObject();
query.put("_id", id);

Expand All @@ -513,8 +537,15 @@ private Document enrichSetTransaction(Document event, String dbName, String coll
current.append("_id", id);
}

Document set = event.get("o", Document.class);
current.append("_set", set.get("$set", Document.class).toJson());
// Add update operator as metadata
Document updateOperator = e.get("o", Document.class);
current.append("_oplog_o", updateOperator.toJson());

// Backward compatibility
if (updateOperator.keySet().contains("$set")) {
current.append("_set", updateOperator.get("$set", Document.class).toJson());
}

e.put("o", current);
});

Expand Down