From f4458cfd71ec395f6c28d3f472408a5a74d77be8 Mon Sep 17 00:00:00 2001 From: Trustpilot Robot User Date: Tue, 4 Dec 2018 10:55:10 +0100 Subject: [PATCH] Docks synced marker (#15) * add docks_synced metadata info to determine how much documents were synced * removed redundant empty lines * remove empty line --- .../connector/mongodb/RecordMakers.java | 19 ++++++++++++++ .../connector/mongodb/Replicator.java | 6 +++-- .../connector/mongodb/SourceInfo.java | 26 ++++++++++++++++--- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java index a2166ae429c..cf05e24cfd8 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/RecordMakers.java @@ -135,6 +135,25 @@ public CollectionId collectionId() { return collectionId; } + /** + * Generate and record one or more source records to describe the given object. + * + * @param id the identifier of the collection in which the document exists; may not be null + * @param object the document; may not be null + * @param timestamp the timestamp at which this operation is occurring + * @param docsSynced number of documents synced + * @return the number of source records that were generated; will be 0 or more + * @throws InterruptedException if the calling thread was interrupted while waiting to submit a record to + * the blocking consumer + */ + public int recordObject(CollectionId id, Document object, long timestamp, long docsSynced) throws InterruptedException { + final Struct sourceValue = source.lastOffsetStruct(replicaSetName, id, docsSynced); + final Map offset = source.lastOffset(replicaSetName); + String objId = idObjToJson(object); + assert objId != null; + return createRecords(sourceValue, offset, Operation.READ, objId, object, timestamp); + } + /** * Generate and record one or more source records to describe the given object. * diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index 9f441be3486..699d941d7f3 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -371,10 +371,12 @@ protected long copyCollection(MongoClient primary, CollectionId collectionId, lo MongoCollection docCollection = db.getCollection(collectionId.name()); long counter = 0; try (MongoCursor cursor = docCollection.find().iterator()) { - while (running.get() && cursor.hasNext()) { + boolean hasNext = cursor.hasNext(); + while (running.get() && hasNext) { Document doc = cursor.next(); logger.trace("Found existing doc in {}: {}", collectionId, doc); - counter += factory.recordObject(collectionId, doc, timestamp); + hasNext = cursor.hasNext(); + counter += factory.recordObject(collectionId, doc, timestamp, !hasNext ? counter+1 : -1); } } return counter; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java index 0172b1c6898..d3ea98c566a 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java @@ -77,6 +77,7 @@ public final class SourceInfo extends AbstractSourceInfo { public static final String ORDER = "ord"; public static final String OPERATION_ID = "h"; public static final String INITIAL_SYNC = "initsync"; + public static final String DOCS_SYNCED = "docs_synced"; private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp(); private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null); @@ -95,6 +96,7 @@ public final class SourceInfo extends AbstractSourceInfo { .field(ORDER, Schema.INT32_SCHEMA) .field(OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA) .field(INITIAL_SYNC, SchemaBuilder.bool().optional().defaultValue(false).build()) + .field(DOCS_SYNCED, Schema.OPTIONAL_INT64_SCHEMA) .build(); private final ConcurrentMap> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>(); @@ -221,7 +223,22 @@ public BsonTimestamp lastOffsetTimestamp(String replicaSetName) { */ public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId) { return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName), - isInitialSyncOngoing(replicaSetName)); + isInitialSyncOngoing(replicaSetName), -1); + } + + /** + * Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String) + * offset} information where we have last read. The Struct complies with the {@link #schema} for the MongoDB connector. + * + * @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null + * @param collectionId the event's collection identifier; may not be null + * @param docsSynced number of documents synced + * @return the source partition and offset {@link Struct}; never null + * @see #schema() + */ + public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId, long docsSynced) { + return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName), + isInitialSyncOngoing(replicaSetName), docsSynced); } /** @@ -244,7 +261,7 @@ public Struct offsetStructForEvent(String replicaSetName, Document oplogEvent) { namespace = oplogEvent.getString("ns"); } positionsByReplicaSetName.put(replicaSetName, position); - return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName)); + return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName), -1); } /** @@ -257,7 +274,7 @@ protected static BsonTimestamp extractEventTimestamp(Document oplogEvent) { return oplogEvent != null ? oplogEvent.get("ts", BsonTimestamp.class) : null; } - private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync) { + private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync, long docsSynced) { if (position == null) position = INITIAL_POSITION; Struct result = super.struct(); result.put(SERVER_NAME, serverName); @@ -269,6 +286,9 @@ private Struct offsetStructFor(String replicaSetName, String namespace, Position if (isInitialSync) { result.put(INITIAL_SYNC, true); } + if (docsSynced > 0) { + result.put(DOCS_SYNCED, docsSynced); + } return result; }