Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docks synced marker #15

Merged
merged 3 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ public CollectionId collectionId() {
return collectionId;
}

/**
* Generate and record one or more source records to describe the given object.
*
* @param id the identifier of the collection in which the document exists; may not be null
* @param object the document; may not be null
* @param timestamp the timestamp at which this operation is occurring
* @param docsSynced number of documents synced
* @return the number of source records that were generated; will be 0 or more
* @throws InterruptedException if the calling thread was interrupted while waiting to submit a record to
* the blocking consumer
*/
public int recordObject(CollectionId id, Document object, long timestamp, long docsSynced) throws InterruptedException {
final Struct sourceValue = source.lastOffsetStruct(replicaSetName, id, docsSynced);
final Map<String, ?> offset = source.lastOffset(replicaSetName);
String objId = idObjToJson(object);
assert objId != null;
return createRecords(sourceValue, offset, Operation.READ, objId, object, timestamp);
}

/**
* Generate and record one or more source records to describe the given object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ protected long copyCollection(MongoClient primary, CollectionId collectionId, lo
MongoCollection<Document> docCollection = db.getCollection(collectionId.name());
long counter = 0;
try (MongoCursor<Document> cursor = docCollection.find().iterator()) {
while (running.get() && cursor.hasNext()) {
boolean hasNext = cursor.hasNext();
while (running.get() && hasNext) {
Document doc = cursor.next();
logger.trace("Found existing doc in {}: {}", collectionId, doc);
counter += factory.recordObject(collectionId, doc, timestamp);
hasNext = cursor.hasNext();
counter += factory.recordObject(collectionId, doc, timestamp, !hasNext ? counter+1 : -1);
}
}
return counter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class SourceInfo extends AbstractSourceInfo {
public static final String ORDER = "ord";
public static final String OPERATION_ID = "h";
public static final String INITIAL_SYNC = "initsync";
public static final String DOCS_SYNCED = "docs_synced";

private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null);
Expand All @@ -95,6 +96,7 @@ public final class SourceInfo extends AbstractSourceInfo {
.field(ORDER, Schema.INT32_SCHEMA)
.field(OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA)
.field(INITIAL_SYNC, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(DOCS_SYNCED, Schema.OPTIONAL_INT64_SCHEMA)
.build();

private final ConcurrentMap<String, Map<String, String>> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -221,7 +223,22 @@ public BsonTimestamp lastOffsetTimestamp(String replicaSetName) {
*/
public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId) {
return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName),
isInitialSyncOngoing(replicaSetName));
isInitialSyncOngoing(replicaSetName), -1);
}

/**
* Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* offset} information where we have last read. The Struct complies with the {@link #schema} for the MongoDB connector.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param collectionId the event's collection identifier; may not be null
* @param docsSynced number of documents synced
* @return the source partition and offset {@link Struct}; never null
* @see #schema()
*/
public Struct lastOffsetStruct(String replicaSetName, CollectionId collectionId, long docsSynced) {
return offsetStructFor(replicaSetName, collectionId.namespace(), positionsByReplicaSetName.get(replicaSetName),
isInitialSyncOngoing(replicaSetName), docsSynced);
}

/**
Expand All @@ -244,7 +261,7 @@ public Struct offsetStructForEvent(String replicaSetName, Document oplogEvent) {
namespace = oplogEvent.getString("ns");
}
positionsByReplicaSetName.put(replicaSetName, position);
return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName));
return offsetStructFor(replicaSetName, namespace, position, isInitialSyncOngoing(replicaSetName), -1);
}

/**
Expand All @@ -257,7 +274,7 @@ protected static BsonTimestamp extractEventTimestamp(Document oplogEvent) {
return oplogEvent != null ? oplogEvent.get("ts", BsonTimestamp.class) : null;
}

private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync) {
private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync, long docsSynced) {
if (position == null) position = INITIAL_POSITION;
Struct result = super.struct();
result.put(SERVER_NAME, serverName);
Expand All @@ -269,6 +286,9 @@ private Struct offsetStructFor(String replicaSetName, String namespace, Position
if (isInitialSync) {
result.put(INITIAL_SYNC, true);
}
if (docsSynced > 0) {
result.put(DOCS_SYNCED, docsSynced);
}
return result;
}

Expand Down