From 32bade4780e432251e448a34ccf39b39baea0e33 Mon Sep 17 00:00:00 2001 From: Andreas Braun Date: Fri, 2 Aug 2019 13:24:31 +0200 Subject: [PATCH 1/2] PHPLIB-461: Fix ChangeStream tests on sharded clusters --- tests/DocumentationExamplesTest.php | 4 + tests/FunctionalTestCase.php | 3 - tests/Operation/WatchFunctionalTest.php | 186 ++++++++++++++++-------- 3 files changed, 127 insertions(+), 66 deletions(-) diff --git a/tests/DocumentationExamplesTest.php b/tests/DocumentationExamplesTest.php index c0f0ca2fc..60efc665c 100644 --- a/tests/DocumentationExamplesTest.php +++ b/tests/DocumentationExamplesTest.php @@ -935,6 +935,10 @@ public function testChangeStreamExample_1_4() { $this->skipIfChangeStreamIsNotSupported(); + if ($this->isShardedCluster()) { + $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.'); + } + $db = new Database($this->manager, $this->getDatabaseName()); $db->dropCollection('inventory'); $db->createCollection('inventory'); diff --git a/tests/FunctionalTestCase.php b/tests/FunctionalTestCase.php index 8c5028f95..c15c3601c 100644 --- a/tests/FunctionalTestCase.php +++ b/tests/FunctionalTestCase.php @@ -336,9 +336,6 @@ protected function skipIfChangeStreamIsNotSupported() if (! $this->isShardedClusterUsingReplicasets()) { $this->markTestSkipped('$changeStream is only supported with replicasets'); } - - // Temporarily skip tests because of an issue with change streams in the driver - $this->markTestSkipped('$changeStreams currently don\'t on replica sets'); break; case Server::TYPE_RS_PRIMARY: diff --git a/tests/Operation/WatchFunctionalTest.php b/tests/Operation/WatchFunctionalTest.php index 064b02830..760b4cfa5 100644 --- a/tests/Operation/WatchFunctionalTest.php +++ b/tests/Operation/WatchFunctionalTest.php @@ -3,6 +3,7 @@ namespace MongoDB\Tests\Operation; use Closure; +use Iterator; use MongoDB\BSON\TimestampInterface; use MongoDB\ChangeStream; use MongoDB\Driver\Cursor; @@ -19,6 +20,7 @@ use MongoDB\Operation\InsertOne; use MongoDB\Operation\Watch; use MongoDB\Tests\CommandObserver; +use PHPUnit\Framework\ExpectationFailedException; use ReflectionClass; use stdClass; use Symfony\Bridge\PhpUnit\SetUpTearDownTrait; @@ -71,8 +73,7 @@ public function testGetResumeToken() $this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 2]); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); $changeStream->next(); @@ -81,8 +82,7 @@ public function testGetResumeToken() $this->insertDocument(['x' => 3]); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); } @@ -134,23 +134,21 @@ function (array $event) use (&$events) { $this->insertDocument(['x' => 1]); $this->insertDocument(['x' => 2]); - $events = []; + $lastEvent = null; (new CommandObserver())->observe( function () use ($changeStream) { - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); }, - function (array $event) use (&$events) { - $events[] = $event; + function (array $event) use (&$lastEvent) { + $lastEvent = $event; } ); - $this->assertCount(1, $events); - $this->assertSame('getMore', $events[0]['started']->getCommandName()); - $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply()); + $this->assertNotNull($lastEvent); + $this->assertSame('getMore', $lastEvent['started']->getCommandName()); + $postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($lastEvent['succeeded']->getReply()); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); $this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken()); $changeStream->next(); @@ -171,8 +169,7 @@ public function testNextResumesAfterCursorNotFound() $this->insertDocument(['_id' => 1, 'x' => 'foo']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -188,8 +185,7 @@ public function testNextResumesAfterCursorNotFound() $this->insertDocument(['_id' => 2, 'x' => 'bar']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -395,6 +391,8 @@ private function assertStartAtOperationTime(TimestampInterface $expectedOperatio public function testRewindMultipleTimesWithResults() { + $this->skipIfIsShardedCluster('Cursor needs to be advanced multiple times and can\'t be rewound afterwards.'); + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $changeStream = $operation->execute($this->getPrimaryServer()); @@ -487,8 +485,7 @@ public function testNoChangeAfterResumeBeforeInsert() $this->insertDocument(['_id' => 1, 'x' => 'foo']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -507,8 +504,7 @@ public function testNoChangeAfterResumeBeforeInsert() $this->insertDocument(['_id' => 2, 'x' => 'bar']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -523,6 +519,8 @@ public function testNoChangeAfterResumeBeforeInsert() public function testResumeMultipleTimesInSuccession() { + $this->skipIfIsShardedCluster('getMore may return empty response before periodicNoopIntervalSecs on sharded clusters.'); + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $changeStream = $operation->execute($this->getPrimaryServer()); @@ -656,8 +654,7 @@ public function testKey() $this->insertDocument(['_id' => 1, 'x' => 'foo']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertSame(0, $changeStream->key()); $changeStream->next(); @@ -676,8 +673,7 @@ public function testKey() $this->insertDocument(['_id' => 2, 'x' => 'bar']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertSame(1, $changeStream->key()); } @@ -693,8 +689,7 @@ public function testNonEmptyPipeline() $changeStream->rewind(); $this->assertFalse($changeStream->valid()); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -765,9 +760,9 @@ public function testNonResumableErrorCodes($errorCode) public function provideNonResumableErrorCodes() { return [ - [136], // CappedPositionLost - [237], // CursorKilled - [11601], // Interrupted + 'CappedPositionLost' => [136], + 'CursorKilled' => [237], + 'Interrupted' => [11601], ]; } @@ -796,7 +791,7 @@ public function testResumeTokenNotFoundClientSideError() $this->expectException(ResumeTokenException::class); $this->expectExceptionMessage('Resume token not found in change document'); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); } /** @@ -819,7 +814,7 @@ public function testResumeTokenNotFoundServerSideError() $this->insertDocument(['x' => 1]); $this->expectException(ServerException::class); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); } /** @@ -847,7 +842,7 @@ public function testResumeTokenInvalidTypeClientSideError() $this->expectException(ResumeTokenException::class); $this->expectExceptionMessage('Expected resume token to have type "array or object" but found "string"'); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); } /** @@ -870,7 +865,7 @@ public function testResumeTokenInvalidTypeServerSideError() $this->insertDocument(['x' => 1]); $this->expectException(ServerException::class); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); } public function testMaxAwaitTimeMS() @@ -916,12 +911,24 @@ public function testMaxAwaitTimeMS() $this->insertDocument(['_id' => 1]); /* Advancing the change stream again will issue a getMore, but the - * server should not block since a document has been inserted. */ - $startTime = microtime(true); - $changeStream->next(); - $duration = microtime(true) - $startTime; - $this->assertLessThan($pivot, $duration); + * server should not block since a document has been inserted. + * For sharded clusters, we have to repeat the getMore iteration until + * the cursor is valid since the first getMore commands after an insert + * may not return any data. Only the time of the last getMore command is + * taken. */ + $attempts = $this->isShardedCluster() ? 5 : 1; + for ($i = 0; $i < $attempts; $i++) { + $startTime = microtime(true); + $changeStream->next(); + $duration = microtime(true) - $startTime; + + if ($changeStream->valid()) { + break; + } + } + $this->assertTrue($changeStream->valid()); + $this->assertLessThan($pivot, $duration); } public function testRewindExtractsResumeTokenAndNextResumes() @@ -940,17 +947,25 @@ public function testRewindExtractsResumeTokenAndNextResumes() $changeStream->rewind(); $this->assertFalse($changeStream->valid()); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $resumeToken = $changeStream->current()->_id; $options = ['resumeAfter' => $resumeToken] + $this->defaultOptions; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $changeStream = $operation->execute($this->getPrimaryServer()); - $this->assertSame($resumeToken, $changeStream->getResumeToken()); + $this->assertSameDocument($resumeToken, $changeStream->getResumeToken()); $changeStream->rewind(); - $this->assertTrue($changeStream->valid()); + + if ($this->isShardedCluster()) { + /* aggregate on a sharded cluster may not return any data in the + * initial batch until periodicNoopIntervalSecs has passed. Thus, + * advance the change stream until we've received data. */ + $this->advanceCursorUntilValid($changeStream); + } else { + $this->assertTrue($changeStream->valid()); + } + $this->assertSame(0, $changeStream->key()); $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -963,8 +978,7 @@ public function testRewindExtractsResumeTokenAndNextResumes() $this->killChangeStreamCursor($changeStream); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertSame(1, $changeStream->key()); $expectedResult = [ @@ -988,18 +1002,25 @@ public function testResumeAfterOption() $this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 2, 'x' => 'bar']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $resumeToken = $changeStream->current()->_id; $options = $this->defaultOptions + ['resumeAfter' => $resumeToken]; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $changeStream = $operation->execute($this->getPrimaryServer()); - $this->assertSame($resumeToken, $changeStream->getResumeToken()); + $this->assertSameDocument($resumeToken, $changeStream->getResumeToken()); $changeStream->rewind(); - $this->assertTrue($changeStream->valid()); + + if ($this->isShardedCluster()) { + /* aggregate on a sharded cluster may not return any data in the + * initial batch until periodicNoopIntervalSecs has passed. Thus, + * advance the change stream until we've received data. */ + $this->advanceCursorUntilValid($changeStream); + } else { + $this->assertTrue($changeStream->valid()); + } $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -1027,18 +1048,25 @@ public function testStartAfterOption() $this->insertDocument(['_id' => 1, 'x' => 'foo']); $this->insertDocument(['_id' => 2, 'x' => 'bar']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $resumeToken = $changeStream->current()->_id; $options = $this->defaultOptions + ['startAfter' => $resumeToken]; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); $changeStream = $operation->execute($this->getPrimaryServer()); - $this->assertSame($resumeToken, $changeStream->getResumeToken()); + $this->assertSameDocument($resumeToken, $changeStream->getResumeToken()); $changeStream->rewind(); - $this->assertTrue($changeStream->valid()); + + if ($this->isShardedCluster()) { + /* aggregate on a sharded cluster may not return any data in the + * initial batch until periodicNoopIntervalSecs has passed. Thus, + * advance the change stream until we've received data. */ + $this->advanceCursorUntilValid($changeStream); + } else { + $this->assertTrue($changeStream->valid()); + } $expectedResult = [ '_id' => $changeStream->current()->_id, @@ -1064,8 +1092,7 @@ public function testTypeMapOption(array $typeMap, $expectedChangeDocument) $this->insertDocument(['_id' => 1, 'x' => 'foo']); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $this->assertMatchesDocument($expectedChangeDocument, $changeStream->current()); } @@ -1114,7 +1141,7 @@ public function testNextAdvancesKey() /* Note: we intentionally do not start iteration with rewind() to ensure * that next() behaves identically when called without rewind(). */ - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->assertSame(0, $changeStream->key()); @@ -1139,7 +1166,7 @@ public function testResumeTokenNotFoundDoesNotAdvanceKey() $this->assertNull($changeStream->key()); try { - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->fail('Exception for missing resume token was not thrown'); } catch (ResumeTokenException $e) { /* On server versions < 4.1.8, a client-side error is thrown. */ @@ -1234,7 +1261,7 @@ public function testSessionFreed() // Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted. $this->dropCollection(); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->assertNull($rp->getValue($changeStream)); } @@ -1356,6 +1383,10 @@ function (array $event) use (&$commandCount) { */ public function testOriginalReadPreferenceIsPreservedOnResume() { + if ($this->isShardedCluster()) { + $this->markTestSkipped('Test does not apply to sharded clusters'); + } + $readPreference = new ReadPreference('secondary'); $options = ['readPreference' => $readPreference] + $this->defaultOptions; $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options); @@ -1437,6 +1468,8 @@ public function testResumeTokenBehaviour() $this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1'); } + $this->skipIfIsShardedCluster('Resume token behaviour can\'t be reliably tested on sharded clusters.'); + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); $lastOpTime = null; @@ -1454,7 +1487,7 @@ public function testResumeTokenBehaviour() $this->insertDocument(['x' => 1]); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->assertTrue($changeStream->valid()); $resumeToken = $changeStream->getResumeToken(); @@ -1499,7 +1532,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter $this->insertDocument(['x' => 1]); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->assertTrue($changeStream->valid()); $resumeToken = $changeStream->getResumeToken(); @@ -1546,8 +1579,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp $this->insertDocument(['x' => 1]); - $changeStream->next(); - $this->assertTrue($changeStream->valid()); + $this->advanceCursorUntilValid($changeStream); $resumeToken = $changeStream->getResumeToken(); $options = ['startAfter' => $resumeToken] + $this->defaultOptions; @@ -1556,7 +1588,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp $changeStream->rewind(); $this->insertDocument(['x' => 2]); - $changeStream->next(); + $this->advanceCursorUntilValid($changeStream); $this->assertTrue($changeStream->valid()); $this->killChangeStreamCursor($changeStream); @@ -1637,4 +1669,32 @@ private function killChangeStreamCursor(ChangeStream $changeStream) $operation = new DatabaseCommand($this->getDatabaseName(), $command); $operation->execute($this->getPrimaryServer()); } + + private function advanceCursorUntilValid(Iterator $iterator, $limitOnShardedClusters = 5) + { + if (! $this->isShardedCluster()) { + $iterator->next(); + $this->assertTrue($iterator->valid()); + + return; + } + + for ($i = 0; $i < $limitOnShardedClusters; $i++) { + $iterator->next(); + if ($iterator->valid()) { + return; + } + } + + throw new ExpectationFailedException(sprintf('Expected cursor to return an element but none was found after %d attempts.', $limitOnShardedClusters)); + } + + private function skipIfIsShardedCluster($message) + { + if (! $this->isShardedCluster()) { + return; + } + + $this->markTestSkipped(sprintf('Test does not apply on sharded clusters: %s', $message)); + } } From 944e7292f65ae7ecf500a85ebf043ade69749d03 Mon Sep 17 00:00:00 2001 From: Andreas Braun Date: Fri, 2 Aug 2019 14:10:25 +0200 Subject: [PATCH 2/2] PHPLIB-461: Lower periodicNoopIntervalSecs --- .../sharded_clusters/cluster_replset.json | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/mongo-orchestration/sharded_clusters/cluster_replset.json b/mongo-orchestration/sharded_clusters/cluster_replset.json index 358db4966..80cd4c2bd 100644 --- a/mongo-orchestration/sharded_clusters/cluster_replset.json +++ b/mongo-orchestration/sharded_clusters/cluster_replset.json @@ -50,7 +50,11 @@ "journal": true, "logappend": true, "port": 4400, - "bind_ip_all": true + "bind_ip_all": true, + "setParameter": { + "periodicNoopIntervalSecs": 1, + "writePeriodicNoops": true + } } }, { "procParams": { "dbpath": "/tmp/SHARDED-RS/SHARD1/4401", @@ -59,7 +63,11 @@ "journal": true, "logappend": true, "port": 4401, - "bind_ip_all": true + "bind_ip_all": true, + "setParameter": { + "periodicNoopIntervalSecs": 1, + "writePeriodicNoops": true + } } } ] } @@ -76,7 +84,11 @@ "journal": true, "logappend": true, "port": 4410, - "bind_ip_all": true + "bind_ip_all": true, + "setParameter": { + "periodicNoopIntervalSecs": 1, + "writePeriodicNoops": true + } } }, { "procParams": { "dbpath": "/tmp/SHARDED-RS/SHARD2/4411", @@ -85,7 +97,11 @@ "journal": true, "logappend": true, "port": 4411, - "bind_ip_all": true + "bind_ip_all": true, + "setParameter": { + "periodicNoopIntervalSecs": 1, + "writePeriodicNoops": true + } } } ] }