Skip to content

Commit

Permalink
Ensure that resumeToken is included on resume attempts (#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyemin committed Jan 20, 2021
1 parent 0e556f1 commit 0b0ffc3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public boolean hasNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
@Override
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
return queryBatchCursor.hasNext();
try {
return queryBatchCursor.hasNext();
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -71,9 +75,11 @@ public List<T> next() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.next());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.next());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -83,9 +89,11 @@ public List<T> tryNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.tryNext());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.tryNext());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
this.decoder = notNull("decoder", decoder);
if (result != null) {
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
}
if (firstQueryResult.getCursor() != null) {
notNull("connectionSource", connectionSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
}

def next(cursor, boolean async, int minimumCount) {
next(cursor, async, false, minimumCount)
}

def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
List<BsonDocument> retVal = []

while (retVal.size() < minimumCount) {
retVal.addAll(next(cursor, async))
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
}

retVal
}

def next(cursor, boolean async) {
doNext(cursor, async, false)
}

def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
if (async) {
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
cursor.next(futureResultCallback)
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
} else {
if (callHasNextBeforeNext) {
cursor.hasNext()
}
cursor.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
// use reflection to access the postBatchResumeToken
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);

// check equality in the case where the batch has not been iterated at all
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
assertNotNull(batchCursor.getPostBatchResumeToken());

// resume token should be null before iteration
assertNull(cursor.getResumeToken());

cursor.next();
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
Expand Down

0 comments on commit 0b0ffc3

Please sign in to comment.