diff --git a/src/ChangeStream.php b/src/ChangeStream.php index 7bdf5f8fe..06ce78b4d 100644 --- a/src/ChangeStream.php +++ b/src/ChangeStream.php @@ -220,7 +220,7 @@ private function onIteration($incrementKey) */ private function resume() { - $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken()); + $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced); $this->iterator->rewind(); $this->onIteration($this->hasAdvanced); diff --git a/src/Operation/Watch.php b/src/Operation/Watch.php index 5a4d0450f..7086f8f1c 100644 --- a/src/Operation/Watch.php +++ b/src/Operation/Watch.php @@ -250,7 +250,7 @@ public function execute(Server $server) { return new ChangeStream( $this->createChangeStreamIterator($server), - function($resumeToken) { return $this->resume($resumeToken); } + function($resumeToken, $hasAdvanced) { return $this->resume($resumeToken, $hasAdvanced); } ); } @@ -333,10 +333,11 @@ private function getInitialResumeToken() * * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process * @param array|object|null $resumeToken + * @param bool $hasAdvanced * @return ChangeStreamIterator * @throws InvalidArgumentException */ - private function resume($resumeToken = null) + private function resume($resumeToken = null, $hasAdvanced = false) { if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) { throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object'); @@ -347,12 +348,14 @@ private function resume($resumeToken = null) // Select a new server using the original read preference $server = $this->manager->selectServer($this->aggregateOptions['readPreference']); + $resumeOption = isset($this->changeStreamOptions['startAfter']) && !$hasAdvanced ? 'startAfter' : 'resumeAfter'; + unset($this->changeStreamOptions['resumeAfter']); unset($this->changeStreamOptions['startAfter']); unset($this->changeStreamOptions['startAtOperationTime']); if ($resumeToken !== null) { - $this->changeStreamOptions['resumeAfter'] = $resumeToken; + $this->changeStreamOptions[$resumeOption] = $resumeToken; } if ($resumeToken === null && $this->operationTime !== null) { diff --git a/tests/Operation/WatchFunctionalTest.php b/tests/Operation/WatchFunctionalTest.php index c3bba0051..093e7a9e8 100644 --- a/tests/Operation/WatchFunctionalTest.php +++ b/tests/Operation/WatchFunctionalTest.php @@ -2,9 +2,12 @@ namespace MongoDB\Tests\Operation; +use Closure; use MongoDB\ChangeStream; use MongoDB\BSON\TimestampInterface; use MongoDB\Driver\Cursor; +use MongoDB\Driver\Exception\CommandException; +use MongoDB\Driver\Exception\ConnectionTimeoutException; use MongoDB\Driver\Manager; use MongoDB\Driver\ReadPreference; use MongoDB\Driver\Server; @@ -25,6 +28,8 @@ class WatchFunctionalTest extends FunctionalTestCase { use SetUpTearDownTrait; + const NOT_MASTER = 10107; + private static $wireVersionForStartAtOperationTime = 7; private $defaultOptions = ['maxAwaitTimeMS' => 500]; @@ -890,9 +895,11 @@ public function testRewindExtractsResumeTokenAndNextResumes() $changeStream->next(); $this->assertTrue($changeStream->valid()); - $options = ['resumeAfter' => $changeStream->current()->_id] + $this->defaultOptions; + $resumeToken = $changeStream->current()->_id; + $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $changeStream = $operation->execute($this->getPrimaryServer()); + $this->assertSame($resumeToken, $changeStream->getResumeToken()); $changeStream->rewind(); $this->assertTrue($changeStream->valid()); @@ -979,6 +986,7 @@ public function testStartAfterOption() $options = $this->defaultOptions + ['startAfter' => $resumeToken]; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $changeStream = $operation->execute($this->getPrimaryServer()); + $this->assertSame($resumeToken, $changeStream->getResumeToken()); $changeStream->rewind(); $this->assertTrue($changeStream->valid()); @@ -1193,6 +1201,286 @@ public function testSessionFreed() $this->assertNull($rp->getValue($changeStream)); } + /** + * Prose test: "ChangeStream will automatically resume one time on a + * resumable error (including not master) with the initial pipeline and + * options, except for the addition/update of a resumeToken." + */ + public function testResumeRepeatsOriginalPipelineAndOptions() + { + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); + + $aggregateCommands = []; + + $this->configureFailPoint([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER], + ]); + + (new CommandObserver)->observe( + function() use ($operation) { + $changeStream = $operation->execute($this->getPrimaryServer()); + + // The first next will hit the fail point, causing a resume + $changeStream->next(); + $changeStream->next(); + }, + function(array $event) use (&$aggregateCommands) { + $command = $event['started']->getCommand(); + if ($event['started']->getCommandName() !== 'aggregate') { + return; + } + + $aggregateCommands[] = (array) $command; + } + ); + + $this->assertCount(2, $aggregateCommands); + + $this->assertThat( + $aggregateCommands[0]['pipeline'][0]->{'$changeStream'}, + $this->logicalNot( + $this->logicalOr( + $this->objectHasAttribute('resumeAfter'), + $this->objectHasAttribute('startAfter'), + $this->objectHasAttribute('startAtOperationTime') + ) + ) + ); + + $this->assertThat( + $aggregateCommands[1]['pipeline'][0]->{'$changeStream'}, + $this->logicalOr( + $this->objectHasAttribute('resumeAfter'), + $this->objectHasAttribute('startAfter'), + $this->objectHasAttribute('startAtOperationTime') + ) + ); + + $aggregateCommands = array_map( + function (array $aggregateCommand) { + // Remove resume options from the changestream document + if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) { + $aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key( + (array) $aggregateCommand['pipeline'][0]->{'$changeStream'}, + ['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false] + ); + } + + // Remove options we don't want to compare between commands + return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]); + }, + $aggregateCommands + ); + + // Ensure options in original and resuming aggregate command match + $this->assertEquals($aggregateCommands[0], $aggregateCommands[1]); + } + + /** + * Prose test: "ChangeStream will not attempt to resume on any error + * encountered while executing an aggregate command." + */ + public function testErrorDuringAggregateCommandDoesNotCauseResume() + { + if (version_compare($this->getServerVersion(), '4.0.0', '<')) { + $this->markTestSkipped('failCommand is not supported'); + } + + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); + + $commandCount = 0; + + $this->configureFailPoint([ + 'configureFailPoint' => 'failCommand', + 'mode' => ['times' => 1], + 'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::NOT_MASTER], + ]); + + $this->expectException(CommandException::class); + + (new CommandObserver)->observe( + function() use ($operation) { + $operation->execute($this->getPrimaryServer()); + }, + function(array $event) use (&$commandCount) { + $commandCount++; + } + ); + + $this->assertSame(1, $commandCount); + } + + /** + * Prose test: "ChangeStream will perform server selection before attempting + * to resume, using initial readPreference" + */ + public function testOriginalReadPreferenceIsPreservedOnResume() + { + $readPreference = new ReadPreference('secondary'); + $options = ['readPreference' => $readPreference] + $this->defaultOptions; + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); + + try { + $secondary = $this->manager->selectServer($readPreference); + } catch (ConnectionTimeoutException $e) { + $this->markTestSkipped('Secondary is not available'); + } + + $changeStream = $operation->execute($secondary); + $previousCursorId = $changeStream->getCursorId(); + $this->killChangeStreamCursor($changeStream); + + $changeStream->next(); + $this->assertNotSame($previousCursorId, $changeStream->getCursorId()); + + $getCursor = Closure::bind( + function () { + return $this->iterator->getInnerIterator(); + }, + $changeStream, + ChangeStream::class + ); + /** @var Cursor $cursor */ + $cursor = $getCursor(); + self::assertTrue($cursor->getServer()->isSecondary()); + } + + /** + * Prose test + * For a ChangeStream under these conditions: + * - Running against a server <4.0.7. + * - The batch is empty or has been iterated to the last document. + * Expected result: + * - getResumeToken must return the _id of the last document returned if one exists. + * - getResumeToken must return resumeAfter from the initial aggregate if the option was specified. + * - If resumeAfter was not specified, the getResumeToken result must be empty. + */ + public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch() + { + if ($this->isPostBatchResumeTokenSupported()) { + $this->markTestSkipped('postBatchResumeToken is supported'); + } + + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->assertNull($changeStream->getResumeToken()); + + $this->insertDocument(['x' => 1]); + + $changeStream->next(); + $this->assertTrue($changeStream->valid()); + $resumeToken = $changeStream->getResumeToken(); + $this->assertSame($resumeToken, $changeStream->current()->_id); + + $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions; + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->assertSame($resumeToken, $changeStream->getResumeToken()); + } + + /** + * Prose test: "$changeStream stage for ChangeStream started with startAfter + * against a server >=4.1.1 that has not received any results yet MUST + * include a startAfter option and MUST NOT include a resumeAfter option + * when resuming a change stream." + */ + public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption() + { + if (version_compare($this->getServerVersion(), '4.1.1', '<')) { + $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'); + } + + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->insertDocument(['x' => 1]); + + $changeStream->next(); + $this->assertTrue($changeStream->valid()); + $resumeToken = $changeStream->getResumeToken(); + + $options = ['startAfter' => $resumeToken] + $this->defaultOptions; + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); + $changeStream = $operation->execute($this->getPrimaryServer()); + $changeStream->rewind(); + $this->killChangeStreamCursor($changeStream); + + $aggregateCommand = null; + + (new CommandObserver)->observe( + function() use ($changeStream) { + $changeStream->next(); + }, + function(array $event) use (&$aggregateCommand) { + if ($event['started']->getCommandName() !== 'aggregate') { + return; + } + + $aggregateCommand = $event['started']->getCommand(); + } + ); + + $this->assertNotNull($aggregateCommand); + $this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'}); + $this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'}); + } + + /** + * Prose test: "$changeStream stage for ChangeStream started with startAfter + * against a server >=4.1.1 that has received at least one result MUST + * include a resumeAfter option and MUST NOT include a startAfter option + * when resuming a change stream." + */ + public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption() + { + if (version_compare($this->getServerVersion(), '4.1.1', '<')) { + $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'); + } + + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->insertDocument(['x' => 1]); + + $changeStream->next(); + $this->assertTrue($changeStream->valid()); + $resumeToken = $changeStream->getResumeToken(); + + $options = ['startAfter' => $resumeToken] + $this->defaultOptions; + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); + $changeStream = $operation->execute($this->getPrimaryServer()); + $changeStream->rewind(); + + $this->insertDocument(['x' => 2]); + $changeStream->next(); + $this->assertTrue($changeStream->valid()); + + $this->killChangeStreamCursor($changeStream); + + $aggregateCommand = null; + + (new CommandObserver)->observe( + function() use ($changeStream) { + $changeStream->next(); + }, + function(array $event) use (&$aggregateCommand) { + if ($event['started']->getCommandName() !== 'aggregate') { + return; + } + + $aggregateCommand = $event['started']->getCommand(); + } + ); + + $this->assertNotNull($aggregateCommand); + $this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'}); + $this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'}); + } + private function assertNoCommandExecuted(callable $callable) { $commands = [];