Skip to content

Commit

Permalink
Docks synced marker (#15)
Browse files Browse the repository at this point in the history
* add docks_synced metadata info to determine how much documents were synced

* removed redundant empty lines

* remove empty line
  • Loading branch information
TPRobots authored and raimondast committed Dec 4, 2018
1 parent 74de89c commit f4458cf
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ public CollectionId collectionId() {
return collectionId;
}

/**
* Generate and record one or more source records to describe the given object.
*
* @param id the identifier of the collection in which the document exists; may not be null
* @param object the document; may not be null
* @param timestamp the timestamp at which this operation is occurring
* @param docsSynced number of documents synced
* @return the number of source records that were generated; will be 0 or more
* @throws InterruptedException if the calling thread was interrupted while waiting to submit a record to
* the blocking consumer
*/
public int recordObject(CollectionId id, Document object, long timestamp, long docsSynced) throws InterruptedException {
final Struct sourceValue = source.lastOffsetStruct(replicaSetName, id, docsSynced);
final Map<String, ?> offset = source.lastOffset(replicaSetName);
String objId = idObjToJson(object);
assert objId != null;
return createRecords(sourceValue, offset, Operation.READ, objId, object, timestamp);
}

/**
* Generate and record one or more source records to describe the given object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ protected long copyCollection(MongoClient primary, CollectionId collectionId, lo
MongoCollection<Document> docCollection = db.getCollection(collectionId.name());
long counter = 0;
try (MongoCursor<Document> cursor = docCollection.find().iterator()) {
while (running.get() && cursor.hasNext()) {
boolean hasNext = cursor.hasNext();
while (running.get() && hasNext) {
Document doc = cursor.next();
logger.trace("Found existing doc in {}: {}", collectionId, doc);
counter += factory.recordObject(collectionId, doc, timestamp);
hasNext = cursor.hasNext();
counter += factory.recordObject(collectionId, doc, timestamp, !hasNext ? counter+1 : -1);
}
}
return counter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class SourceInfo extends AbstractSourceInfo {
public static final String ORDER = "ord";
public static final String OPERATION_ID = "h";
public static final String INITIAL_SYNC = "initsync";
public static final String DOCS_SYNCED = "docs_synced";

private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null);
Expand All @@ -95,6 +96,7 @@ public final class SourceInfo extends AbstractSourceInfo {
.field(ORDER, Schema.INT32_SCHEMA)
.field(OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA)
.field(INITIAL_SYNC, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(DOCS_SYNCED, Schema.OPTIONAL_INT64_SCHEMA)
.build();

private final ConcurrentMap<String, Map<String, String>> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -221,7 +223,22 @@ public BsonTimestamp lastOffsetTimestamp(String replicaSetName) {
*/
public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId) {
return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName),
isInitialSyncOngoing(replicaSetName));
isInitialSyncOngoing(replicaSetName), -1);
}

/**
* Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* offset} information where we have last read. The Struct complies with the {@link #schema} for the MongoDB connector.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param collectionId the event's collection identifier; may not be null
* @param docsSynced number of documents synced
* @return the source partition and offset {@link Struct}; never null
* @see #schema()
*/
public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId, long docsSynced) {
return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName),
isInitialSyncOngoing(replicaSetName), docsSynced);
}

/**
Expand All @@ -244,7 +261,7 @@ public Struct offsetStructForEvent(String replicaSetName, Document oplogEvent) {
namespace = oplogEvent.getString("ns");
}
positionsByReplicaSetName.put(replicaSetName, position);
return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName));
return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName), -1);
}

/**
Expand All @@ -257,7 +274,7 @@ protected static BsonTimestamp extractEventTimestamp(Document oplogEvent) {
return oplogEvent != null ? oplogEvent.get("ts", BsonTimestamp.class) : null;
}

private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync) {
private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync, long docsSynced) {
if (position == null) position = INITIAL_POSITION;
Struct result = super.struct();
result.put(SERVER_NAME, serverName);
Expand All @@ -269,6 +286,9 @@ private Struct offsetStructFor(String replicaSetName, String namespace, Position
if (isInitialSync) {
result.put(INITIAL_SYNC, true);
}
if (docsSynced > 0) {
result.put(DOCS_SYNCED, docsSynced);
}
return result;
}

Expand Down

0 comments on commit f4458cf

Please sign in to comment.