Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 100% CPU usage when starting multiple ChangeStreams #181

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String
int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0);

String namespace = getFullCollectionNamespace(collectionName);
Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation);
Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation);
return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import java.util.NoSuchElementException;

import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.oplog.OplogPosition;

public class EmptyCursor extends AbstractCursor {
public class EmptyCursor extends AbstractCursor implements TailableCursor {

private static final long EMPTY_CURSOR_ID = 0L;

Expand Down Expand Up @@ -33,4 +34,9 @@ public List<Document> takeDocuments(int numberToReturn) {
public String toString() {
return getClass().getSimpleName() + "()";
}

@Override
public OplogPosition getPosition() {
return null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this supposed to be ever invoked?
If not, wouldn’t it be cleaner to throw an UnsupportedOperationException?

OTH, if it’s never supposed to be invoked, to me this fact suggests that the TailableCursor refactoring might not be a good idea after all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is meant to be invoked, i'll move it to a separate PR that requires it.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.bwaldvogel.mongo.backend;

import de.bwaldvogel.mongo.oplog.OplogPosition;

public interface TailableCursor extends Cursor {
Copy link
Owner

@bwaldvogel bwaldvogel Apr 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TailableCursor refactoring relevant for the bugfix?
If not, please keep it out of this PR and we could discuss the refactoring in a follow-up discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, you are right, I will raise a separate PR for tailable cursors.


OplogPosition getPosition();

}
143 changes: 143 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/oplog/ChangeStreamCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package de.bwaldvogel.mongo.oplog;

import java.util.List;
import java.util.stream.Collectors;

import de.bwaldvogel.mongo.MongoBackend;
import de.bwaldvogel.mongo.backend.TailableCursor;
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
import de.bwaldvogel.mongo.bson.BsonTimestamp;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.exception.MongoServerException;

public class ChangeStreamCursor implements TailableCursor {

private static final String FULL_DOCUMENT = "fullDocument";
private static final String OPERATION_TYPE = "operationType";
private static final String CLUSTER_TIME = "clusterTime";
private static final String DOCUMENT_KEY = "documentKey";

private final MongoBackend mongoBackend;
private final Document changeStreamDocument;
private final Aggregation aggregation;
private final OplogCursor oplogCursor;

ChangeStreamCursor(
MongoBackend mongoBackend,
Document changeStreamDocument,
Aggregation aggregation,
OplogCursor oplogCursor
) {
this.mongoBackend = mongoBackend;
this.changeStreamDocument = changeStreamDocument;
this.aggregation = aggregation;
this.oplogCursor = oplogCursor;
}

@Override
public long getId() {
return oplogCursor.getId();
}

@Override
public boolean isEmpty() {
return oplogCursor.isEmpty();
}

@Override
public List<Document> takeDocuments(int numberToReturn) {
return aggregation.runStagesAsStream(
oplogCursor.takeDocuments(numberToReturn).stream()
.map(this::toChangeStreamResponseDocument)
).collect(Collectors.toList());
}

@Override
public OplogPosition getPosition() {
return oplogCursor.getPosition();
}

private Document toChangeStreamResponseDocument(Document oplogDocument) {
OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString());
Document documentKey = new Document();
Document document = getUpdateDocument(oplogDocument);
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument);
OplogPosition oplogPosition = new OplogPosition(timestamp);
switch (operationType) {
case UPDATE:
case DELETE:
documentKey = document;
break;
case INSERT:
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
break;
case COMMAND:
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
default:
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
}

return new Document()
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
.append(OPERATION_TYPE, operationType.getDescription())
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
.append(DOCUMENT_KEY, documentKey)
.append(CLUSTER_TIME, timestamp);
}

private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
Document document = getUpdateDocument(oplogDocument);
String operationType = document.keySet().stream().findFirst().orElseThrow(
() -> new MongoServerException("Unspecified command operation type")
);

return new Document()
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
.append(OPERATION_TYPE, operationType)
.append(CLUSTER_TIME, timestamp);
}

private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
switch (operationType) {
case INSERT:
return getUpdateDocument(document);
case DELETE:
return null;
case UPDATE:
return lookUpUpdateDocument(changeStreamDocument, document);
}
throw new IllegalArgumentException("Invalid operation type");
}

private Document getUpdateDocument(Document document) {
return (Document) document.get(OplogDocumentFields.O);
}

private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
String databaseName = namespace.split("\\.")[0];
String collectionName = namespace.split("\\.")[1];
return mongoBackend.resolveDatabase(databaseName)
.resolveCollection(collectionName, true)
.queryAllAsStream()
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
.findFirst()
.orElse(deltaUpdate);
}
return deltaUpdate;
}

private Document getDeltaUpdate(Document updateDocument) {
Document delta = new Document();
if (updateDocument.containsKey("$set")) {
delta.appendAll((Document) updateDocument.get("$set"));
}
if (updateDocument.containsKey("$unset")) {
delta.appendAll((Document) updateDocument.get("$unset"));
}
return delta;
}

}
129 changes: 23 additions & 106 deletions core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
package de.bwaldvogel.mongo.oplog;

import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import de.bwaldvogel.mongo.MongoBackend;
import de.bwaldvogel.mongo.MongoCollection;
import de.bwaldvogel.mongo.backend.Cursor;
import de.bwaldvogel.mongo.backend.CursorRegistry;
import de.bwaldvogel.mongo.backend.TailableCursor;
import de.bwaldvogel.mongo.backend.Utils;
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
import de.bwaldvogel.mongo.bson.BsonTimestamp;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.exception.MongoServerException;

public class CollectionBackedOplog implements Oplog {

private static final long ELECTION_TERM = 1L;
private static final String START_AT_OPERATION_TIME = "startAtOperationTime";
private static final String FULL_DOCUMENT = "fullDocument";
private static final String START_AFTER = "startAfter";
private static final String RESUME_AFTER = "resumeAfter";
private static final String OPERATION_TYPE = "operationType";
private static final String CLUSTER_TIME = "clusterTime";
private static final String DOCUMENT_KEY = "documentKey";

private final OplogClock oplogClock;
private final MongoCollection<Document> collection;
Expand Down Expand Up @@ -83,21 +80,19 @@ public void handleDropCollection(String namespace) {
collection.addDocument(toOplogDropCollection(databaseName, collectionName));
}

private Stream<Document> streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation,
String namespace) {
return aggregation.runStagesAsStream(collection.queryAllAsStream()
private Stream<Document> streamOplog(OplogPosition position, String namespace) {
return collection.queryAllAsStream()
.filter(document -> filterNamespace(document, namespace))
.filter(document -> {
BsonTimestamp timestamp = getOplogTimestamp(document);
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
return documentOplogPosition.isAfter(position);
})
.sorted((o1, o2) -> {
BsonTimestamp timestamp1 = getOplogTimestamp(o1);
BsonTimestamp timestamp2 = getOplogTimestamp(o2);
BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1);
BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2);
return timestamp1.compareTo(timestamp2);
})
.map(document -> toChangeStreamResponseDocument(document, changeStreamDocument)));
});
}

private static boolean filterNamespace(Document document, String namespace) {
Expand All @@ -110,7 +105,16 @@ private static boolean filterNamespace(Document document, String namespace) {
}

@Override
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
return new OplogCursor(
cursorRegistry.generateCursorId(),
position -> streamOplog(position, namespace),
initialOplogPosition
);
}

@Override
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
Document startAfter = (Document) changeStreamDocument.get(START_AFTER);
Document resumeAfter = (Document) changeStreamDocument.get(RESUME_AFTER);
BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME);
Expand All @@ -123,7 +127,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
String collectionName = Utils.getCollectionNameFromFullName(namespace);
boolean resumeAfterTerminalEvent = collection.queryAllAsStream()
.filter(document -> {
BsonTimestamp timestamp = getOplogTimestamp(document);
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
return initialOplogPosition.isAfter(documentOplogPosition.inclusive());
})
Expand All @@ -141,9 +145,9 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
initialOplogPosition = new OplogPosition(oplogClock.now());
}

Function<OplogPosition, Stream<Document>> streamSupplier =
position -> streamOplog(changeStreamDocument, position, aggregation, namespace);
OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition);
OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition);
ChangeStreamCursor cursor
= new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor);
cursorRegistry.add(cursor);
return cursor;
}
Expand Down Expand Up @@ -185,91 +189,4 @@ private boolean isOplogCollection(String namespace) {
return collection.getFullName().equals(namespace);
}

private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
switch (operationType) {
case INSERT:
return getUpdateDocument(document);
case DELETE:
return null;
case UPDATE:
return lookUpUpdateDocument(changeStreamDocument, document);
}
throw new IllegalArgumentException("Invalid operation type");
}

private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
String databaseName = namespace.split("\\.")[0];
String collectionName = namespace.split("\\.")[1];
return backend.resolveDatabase(databaseName)
.resolveCollection(collectionName, true)
.queryAllAsStream()
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
.findFirst()
.orElse(deltaUpdate);
}
return deltaUpdate;
}

private Document getDeltaUpdate(Document updateDocument) {
Document delta = new Document();
if (updateDocument.containsKey("$set")) {
delta.appendAll((Document) updateDocument.get("$set"));
}
if (updateDocument.containsKey("$unset")) {
delta.appendAll((Document) updateDocument.get("$unset"));
}
return delta;
}

private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) {
OperationType operationType = OperationType.fromCode(oplogDocument.get(OplogDocumentFields.OPERATION_TYPE).toString());
Document documentKey = new Document();
Document document = getUpdateDocument(oplogDocument);
BsonTimestamp timestamp = getOplogTimestamp(oplogDocument);
OplogPosition oplogPosition = new OplogPosition(timestamp);
switch (operationType) {
case UPDATE:
case DELETE:
documentKey = document;
break;
case INSERT:
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
break;
case COMMAND:
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
default:
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
}

return new Document()
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
.append(OPERATION_TYPE, operationType.getDescription())
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
.append(DOCUMENT_KEY, documentKey)
.append(CLUSTER_TIME, timestamp);
}

private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
Document document = getUpdateDocument(oplogDocument);
String operationType = document.keySet().stream().findFirst().orElseThrow(
() -> new MongoServerException("Unspecified command operation type")
);

return new Document()
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
.append(OPERATION_TYPE, operationType)
.append(CLUSTER_TIME, timestamp);
}

private static BsonTimestamp getOplogTimestamp(Document document) {
return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP);
}

private static Document getUpdateDocument(Document document) {
return (Document) document.get(OplogDocumentFields.O);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import java.util.List;

import de.bwaldvogel.mongo.backend.AbstractCursor;
import de.bwaldvogel.mongo.backend.TailableCursor;
import de.bwaldvogel.mongo.bson.Document;

class InvalidateOplogCursor extends AbstractCursor {
class InvalidateOplogCursor extends AbstractCursor implements TailableCursor {
private final OplogPosition position;

InvalidateOplogCursor(OplogPosition position) {
Expand All @@ -27,4 +28,8 @@ public List<Document> takeDocuments(int numberToReturn) {
return Collections.singletonList(result);
}

@Override
public OplogPosition getPosition() {
return null;
}
}
Loading