diff --git a/src/Spanner/Connection/Grpc.php b/src/Spanner/Connection/Grpc.php index 6d93f80cd03e..abaef6047f28 100644 --- a/src/Spanner/Connection/Grpc.php +++ b/src/Spanner/Connection/Grpc.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Connection; +use Grpc\UnaryCall; use Google\Cloud\Core\GrpcRequestWrapper; use Google\Cloud\Core\GrpcTrait; use Google\Cloud\Core\LongRunning\OperationResponseTrait; @@ -25,6 +26,7 @@ use Google\Cloud\Spanner\Operation; use Google\Cloud\Spanner\SpannerClient as ManualSpannerClient; use Google\Cloud\Spanner\V1\SpannerClient; +use Google\GAX\AgentHeaderDescriptor; use Google\GAX\Serializer; use Google\Protobuf; use Google\Protobuf\FieldMask; @@ -34,6 +36,7 @@ use Google\Protobuf\Value; use Google\Spanner\Admin\Database\V1\Database; use Google\Spanner\Admin\Instance\V1\Instance; +use Google\Spanner\V1\DeleteSessionRequest; use Google\Spanner\V1\KeySet; use Google\Spanner\V1\Mutation; use Google\Spanner\V1\Mutation_Delete; @@ -116,6 +119,11 @@ class Grpc implements ConnectionInterface */ private $longRunningGrpcClients; + /** + * @var AgentHeaderDescriptor + */ + private $headerDescriptor; + /** * @param array $config [optional] */ @@ -142,16 +150,20 @@ public function __construct(array $config = []) $config['serializer'] = $this->serializer; $this->setRequestWrapper(new GrpcRequestWrapper($config)); - $grpcConfig = $this->getGaxConfig(ManualSpannerClient::VERSION); + $this->spannerClient = isset($config['gapicSpannerClient']) + ? $config['gapicSpannerClient'] + : new SpannerClient($grpcConfig); $this->instanceAdminClient = new InstanceAdminClient($grpcConfig); $this->databaseAdminClient = new DatabaseAdminClient($grpcConfig); - $this->spannerClient = new SpannerClient($grpcConfig); $this->operationsClient = $this->instanceAdminClient->getOperationsClient(); $this->longRunningGrpcClients = [ $this->instanceAdminClient, $this->databaseAdminClient ]; + $this->headerDescriptor = new AgentHeaderDescriptor([ + 'gapicVersion' => trim(file_get_contents(__DIR__ . '/../VERSION')) + ]); } /** @@ -444,6 +456,35 @@ public function deleteSession(array $args) ]); } + /** + * Note: This should be removed once GAPIC exposes the ability to execute + * concurrent requests. + * + * @access private + * @param array $args + * @return UnaryCall + * @experimental + */ + public function deleteSessionAsync(array $args) + { + $database = $this->pluck('database', $args); + $headers = $this->headerDescriptor->getHeader() + + $this->addResourcePrefixHeader($args, $database)['userHeaders']; + $request = new DeleteSessionRequest(); + $request->setName($this->pluck('name', $args)); + $credentialsCallback = $this->spannerClient + ->getCredentialsHelper() + ->createCallCredentialsCallback(); + + return $this->spannerClient + ->getStub() + ->DeleteSession( + $request, + $headers, + ['call_credentials_callback' => $credentialsCallback] + ); + } + /** * @param array $args * @return \Generator diff --git a/src/Spanner/Database.php b/src/Spanner/Database.php index 4c3c7f4c7ebe..27fb931b3d78 100644 --- a/src/Spanner/Database.php +++ b/src/Spanner/Database.php @@ -1457,6 +1457,18 @@ public function identity() ]; } + /** + * Returns the underlying connection. + * + * @access private + * @return ConnectionInterface + * @experimental + */ + public function connection() + { + return $this->connection; + } + /** * Represent the class in a more readable and digestable fashion. * diff --git a/src/Spanner/Session/CacheSessionPool.php b/src/Spanner/Session/CacheSessionPool.php index a533938e8d23..fb6be4c10338 100644 --- a/src/Spanner/Session/CacheSessionPool.php +++ b/src/Spanner/Session/CacheSessionPool.php @@ -23,6 +23,7 @@ use Google\Cloud\Core\Lock\SemaphoreLock; use Google\Cloud\Core\SysvTrait; use Google\Cloud\Spanner\Database; +use Grpc\UnaryCall; use Psr\Cache\CacheItemPoolInterface; /** @@ -39,19 +40,18 @@ * recommended way to bootstrap the session pool. * * Sessions are created on demand up to the maximum session value set during - * instantiation of the pool. After peak usage hours, you may find that more - * sessions are available than your demand may require. It is important to make - * sure the number of active sessions managed by the Spanner backend is kept - * as minimal as possible. In order to help maintain this balance, please use - * the {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()} method - * on an interval that matches when you expect to see a decrease in traffic. - * This will help ensure you never run into issues where the Spanner backend is + * instantiation of the pool. To help ensure the minimum number of sessions + * required are managed by the pool, attempts will be made to automatically + * downsize after every 10 minute window. This feature is configurable and one + * may also downsize at their own choosing via + * {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()}. Downsizing + * will help ensure you never run into issues where the Spanner backend is * locked up after having met the maximum number of sessions assigned per node. * For reference, the current maximum sessions per database per node is 10k. For * more information on limits please see * [here](https://cloud.google.com/spanner/docs/limits). * - * Additionally, when expecting a long period of inactivity (such as a + * When expecting a long period of inactivity (such as a * maintenance window), please make sure to call * {@see Google\Cloud\Spanner\Session\CacheSessionPool::clear()} in order to * delete any active sessions. @@ -83,9 +83,9 @@ class CacheSessionPool implements SessionPoolInterface use SysvTrait; const CACHE_KEY_TEMPLATE = 'cache-session-pool.%s.%s.%s'; - const DURATION_TWENTY_MINUTES = 1200; const DURATION_ONE_MINUTE = 60; + const WINDOW_SIZE = 600; /** * @var array @@ -95,7 +95,8 @@ class CacheSessionPool implements SessionPoolInterface 'minSessions' => 1, 'shouldWaitForSession' => true, 'maxCyclesToWaitForSession' => 30, - 'sleepIntervalSeconds' => .5 + 'sleepIntervalSeconds' => .5, + 'shouldAutoDownsize' => true ]; /** @@ -118,6 +119,16 @@ class CacheSessionPool implements SessionPoolInterface */ private $database; + /** + * @var UnaryCall[] + */ + private $deleteCalls = []; + + /** + * @var array + */ + private $deleteQueue = []; + /** * @param CacheItemPoolInterface $cacheItemPool A PSR-6 compatible cache * implementation used to store the session data. @@ -140,6 +151,9 @@ class CacheSessionPool implements SessionPoolInterface * **Defaults to** a semaphore based implementation if the * required extensions are installed, otherwise an flock based * implementation. + * @type bool $shouldAutoDownsize Determines whether or not to + * automatically attempt to downsize the pool after every 10 + * minute window. **Defaults to** `true`. * } * @throws \InvalidArgumentException */ @@ -228,10 +242,13 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ) if (!$exception) { $session = array_shift($data['queue']); - $data['inUse'][$session['name']] = $session + [ 'lastActive' => $this->time() ]; + + if ($this->config['shouldAutoDownsize']) { + $this->manageSessionsToDelete($data); + } } $this->cacheItemPool->save($item->set($data)); @@ -257,6 +274,11 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ) $session = $this->waitForNextAvailableSession(); } + if ($this->deleteQueue) { + $this->deleteSessions($this->deleteQueue); + $this->deleteQueue = []; + } + return $this->database->session($session['name']); } @@ -414,40 +436,24 @@ public function warmup() /** * Clear the cache and attempt to delete all sessions in the pool. * - * Please note this method will attempt to synchronously delete sessions and - * will block until complete. + * A session may be removed from the cache, but still tracked as active by + * the Spanner backend if a delete operation failed. To ensure you do not + * exceed the maximum number of sessions available per node, please be sure + * to check the return value of this method to be certain all sessions have + * been deleted. */ public function clear() { $sessions = $this->config['lock']->synchronize(function () { - $sessions = []; $item = $this->cacheItemPool->getItem($this->cacheKey); $data = (array) $item->get() ?: $this->initialize(); - - foreach ($data['queue'] as $session) { - $sessions[] = $session['name']; - } - - foreach ($data['inUse'] as $session) { - $sessions[] = $session['name']; - } - + $sessions = $data['queue'] + $data['inUse']; $this->cacheItemPool->clear(); return $sessions; }); - foreach ($sessions as $sessionName) { - $session = $this->database->session($sessionName); - - try { - $session->delete(); - } catch (\Exception $ex) { - if ($ex instanceof NotFoundException) { - continue; - } - } - } + $this->deleteSessions($sessions); } /** @@ -557,7 +563,9 @@ private function initialize() return [ 'queue' => [], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time(), + 'maxInUseSessions' => 0 ]; } @@ -570,17 +578,13 @@ private function initialize() */ private function getSessionCount(array $data) { - $count = 0; - - foreach ($data as $sessionType) { - $count += count($sessionType); - } - - return $count; + return count($data['queue']) + + count($data['inUse']) + + count($data['toCreate']); } /** - * Gets the next session in the queue, clearing out which are expired. + * Gets the next session in the queue, clearing out any which are expired. * * @param array $data * @return array|null @@ -597,6 +601,10 @@ private function getSession(array &$data) $data['inUse'][$session['name']] = $session + [ 'lastActive' => $this->time() ]; + + if ($this->config['shouldAutoDownsize']) { + $this->manageSessionsToDelete($data); + } } return $session; @@ -746,4 +754,62 @@ private function validateConfig() ); } } + + /** + * Delete the provided sessions. + * + * @param array $sessions + */ + private function deleteSessions(array $sessions) + { + // gRPC calls appear to cancel when the corresponding UnaryCall object + // goes out of scope. Keeping the calls in scope allows time for the + // calls to complete at the expense of a small memory footprint. + $this->deleteCalls = []; + + foreach ($sessions as $session) { + $this->deleteCalls[] = $this->database->connection() + ->deleteSessionAsync([ + 'name' => $session['name'], + 'database' => $this->database->name() + ]); + } + } + + /** + * Checks the maximum number of sessions in use over the last window(s) then + * removes the sessions from the cache and prepares them to be deleted from + * the Spanner backend. + * + * @param array $data + */ + private function manageSessionsToDelete(array &$data) + { + $secondsSinceLastWindow = $this->time() - $data['windowStart']; + $inUseCount = count($data['inUse']); + + if ($secondsSinceLastWindow < self::WINDOW_SIZE + 1) { + if ($data['maxInUseSessions'] < $inUseCount) { + $data['maxInUseSessions'] = $inUseCount; + } + + return; + } + + $totalCount = $inUseCount + count($data['queue']) + count($data['toCreate']); + $windowsPassed = (int) ($secondsSinceLastWindow / self::WINDOW_SIZE); + $deletionCount = min( + $totalCount - (int) round($data['maxInUseSessions'] / $windowsPassed), + $totalCount - $this->config['minSessions'] + ); + $data['maxInUseSessions'] = $inUseCount; + $data['windowStart'] = $this->time(); + + if ($deletionCount) { + $this->deleteQueue += array_splice( + $data['queue'], + (int) -$deletionCount + ); + } + } } diff --git a/src/Spanner/V1/SpannerClient.php b/src/Spanner/V1/SpannerClient.php index e078ad3ca308..acf15f140c11 100644 --- a/src/Spanner/V1/SpannerClient.php +++ b/src/Spanner/V1/SpannerClient.php @@ -31,11 +31,35 @@ namespace Google\Cloud\Spanner\V1; use Google\Cloud\Spanner\V1\Gapic\SpannerGapicClient; +use Google\GAX\GrpcCredentialsHelper; +use Google\Spanner\V1\SpannerGrpcClient; /** * {@inheritdoc} */ class SpannerClient extends SpannerGapicClient { - // This class is intentionally empty, and is intended to hold manual additions to the generated {@see SpannerClientImpl} class. + /** + * Returns the underlying stub. + * + * @access private + * @return SpannerGrpcClient + * @experimental + */ + public function getStub() + { + return $this->spannerStub; + } + + /** + * Returns the underlying gRPC credentials helper. + * + * @access private + * @return GrpcCredentialsHelper + * @experimental + */ + public function getCredentialsHelper() + { + return $this->grpcCredentialsHelper; + } } diff --git a/tests/unit/Spanner/Connection/GrpcTest.php b/tests/unit/Spanner/Connection/GrpcTest.php index b9b6fa8432ef..4c6a3420a3e8 100644 --- a/tests/unit/Spanner/Connection/GrpcTest.php +++ b/tests/unit/Spanner/Connection/GrpcTest.php @@ -17,11 +17,14 @@ namespace Google\Cloud\Tests\Unit\Spanner\Connection; +use Grpc\UnaryCall; use Google\Cloud\Core\GrpcRequestWrapper; use Google\Cloud\Core\GrpcTrait; use Google\Cloud\Spanner\Connection\Grpc; +use Google\Cloud\Spanner\V1\SpannerClient; use Google\Cloud\Spanner\ValueMapper; use Google\Cloud\Tests\GrpcTestTrait; +use Google\GAX\GrpcCredentialsHelper; use Google\GAX\OperationResponse; use Google\GAX\Serializer; use Google\Protobuf\FieldMask; @@ -31,12 +34,14 @@ use Google\Spanner\V1\Mutation_Write; use Google\Spanner\V1\TransactionOptions_ReadOnly; use Google\Spanner\V1\TransactionOptions_ReadWrite; -use Prophecy\Argument; +use Google\Spanner\V1\DeleteSessionRequest; use Google\Spanner\V1\KeySet; use Google\Spanner\V1\Mutation; +use Google\Spanner\V1\SpannerGrpcClient; use Google\Spanner\V1\TransactionOptions; use Google\Spanner\V1\TransactionSelector; use Google\Spanner\V1\Type; +use Prophecy\Argument; /** * @group spanner @@ -59,6 +64,37 @@ public function setUp() $this->successMessage = 'success'; } + public function testDeleteSessionAsync() + { + $cb = function () {}; + $sessionName = 'session1'; + $databaseName = 'database1'; + $request = new DeleteSessionRequest(); + $request->setName($sessionName); + $unaryCall = $this->prophesize(UnaryCall::class); + $credentialsHelper = $this->prophesize(GrpcCredentialsHelper::class); + $client = $this->prophesize(SpannerClient::class); + $stub = $this->prophesize(SpannerGrpcClient::class); + $credentialsHelper->createCallCredentialsCallback() + ->willReturn($cb); + $stub->DeleteSession( + $request, + Argument::type('array'), + Argument::withKey('call_credentials_callback') + )->willReturn($unaryCall->reveal()); + $client->getStub() + ->willReturn($stub->reveal()); + $client->getCredentialsHelper() + ->willReturn($credentialsHelper->reveal()); + $grpc = new Grpc(['gapicSpannerClient' => $client->reveal()]); + $call = $grpc->deleteSessionAsync([ + 'name' => $sessionName, + 'database' => $databaseName + ]); + + $this->assertInstanceOf(UnaryCall::class, $call); + } + /** * @dataProvider methodProvider */ @@ -234,6 +270,8 @@ public function methodProvider() $mutation->setInsert($operation); $insertMutationsArr[] = $mutation; + $deleteSessionRequest = $this->prophesize(DeleteSessionRequest::class)->reveal(); + return [ [ 'listInstanceConfigs', diff --git a/tests/unit/Spanner/Session/CacheSessionPoolTest.php b/tests/unit/Spanner/Session/CacheSessionPoolTest.php index 6c0241a82789..272dda2a9347 100644 --- a/tests/unit/Spanner/Session/CacheSessionPoolTest.php +++ b/tests/unit/Spanner/Session/CacheSessionPoolTest.php @@ -21,10 +21,12 @@ use Google\Auth\Cache\MemoryCacheItemPool; use Google\Cloud\Core\Lock\MockValues; +use Google\Cloud\Spanner\Connection\Grpc; use Google\Cloud\Spanner\Database; use Google\Cloud\Spanner\Session\CacheSessionPool; use Google\Cloud\Spanner\Session\Session; use Google\Cloud\Tests\GrpcTestTrait; +use Grpc\UnaryCall; use Psr\Cache\CacheItemInterface; use Psr\Cache\CacheItemPoolInterface; use Prophecy\Argument; @@ -98,7 +100,9 @@ public function testAcquireThrowsExceptionWhenMaxCyclesMet() 'lastActive' => $this->time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ]; $pool = new CacheSessionPoolStub($this->getCacheItemPool($cacheData), $config, $this->time); $pool->setDatabase($this->getDatabase()); @@ -123,7 +127,9 @@ public function testAcquireThrowsExceptionWithNoAvailableSessions() 'lastActive' => $this->time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ]; $pool = new CacheSessionPoolStub($this->getCacheItemPool($cacheData), $config, $this->time); $pool->setDatabase($this->getDatabase()); @@ -163,7 +169,9 @@ public function testRelease() 'lastActive' => $this->time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ]; $expectedCacheData = [ 'queue' => [ @@ -173,7 +181,9 @@ public function testRelease() ] ], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ]; $session = $this->prophesize(Session::class); $session->name() @@ -207,7 +217,9 @@ public function testKeepAlive() 'lastActive' => $lastActiveOriginal ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ]), [], $this->time); $pool->setDatabase($this->getDatabase()); $actualItemPool = $pool->cacheItemPool(); @@ -255,7 +267,9 @@ public function testDownsizeDeletes($percent, $expectedDeleteCount) ] ], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $this->time, + 'maxInUseSessions' => 1 ])); $pool->setDatabase($this->getDatabase(false, true)); @@ -361,7 +375,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], $time ], @@ -376,7 +392,9 @@ public function acquireDataProvider() ] ], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], [ 'queue' => [], @@ -387,7 +405,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], $time ], @@ -404,7 +424,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], [ 'queue' => [], @@ -420,7 +442,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 2 ], $time ], @@ -443,7 +467,9 @@ public function acquireDataProvider() ], 'toCreate' => [ 'oldguy' => $time - 1201 - ] + ], + 'windowStart' => $time, + 'maxInUseSessions' => 2 ], [ 'queue' => [], @@ -454,7 +480,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 2 ], $time ], @@ -469,7 +497,9 @@ public function acquireDataProvider() ] ], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], [ 'queue' => [], @@ -480,7 +510,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], $time ], @@ -499,7 +531,9 @@ public function acquireDataProvider() ] ], 'inUse' => [], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], [ 'queue' => [], @@ -510,7 +544,9 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], $time ], @@ -526,7 +562,9 @@ public function acquireDataProvider() 'lastActive' => $time - 1201 ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], [ 'queue' => [], @@ -537,10 +575,54 @@ public function acquireDataProvider() 'lastActive' => $time ] ], - 'toCreate' => [] + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 ], $time ], + // Set #7: Auto downsize pool + [ + ['maxSessions' => 5], + [ + 'queue' => [ + [ + 'name' => 'session1', + 'expiration' => $time + 3600 + ], + [ + 'name' => 'session2', + 'expiration' => $time + 3600 + ], + [ + 'name' => 'session3', + 'expiration' => $time + 3600 + ], + [ + 'name' => 'session4', + 'expiration' => $time + 3600 + ] + ], + 'inUse' => [], + 'toCreate' => [], + 'windowStart' => $time - 601, + 'maxInUseSessions' => 1 + ], + [ + 'queue' => [], + 'inUse' => [ + 'session1' => [ + 'name' => 'session1', + 'expiration' => $time + 3600, + 'lastActive' => $time + ] + ], + 'toCreate' => [], + 'windowStart' => $time, + 'maxInUseSessions' => 1 + ], + $time + ] ]; } @@ -549,17 +631,25 @@ private function getDatabase($shouldCreateThrowException = false, $willDeleteSes $database = $this->prophesize(Database::class); $createdSession = $this->prophesize(Session::class); $session = $this->prophesize(Session::class); + $connection = $this->prophesize(Grpc::class); + $unaryCall = $this->prophesize(UnaryCall::class); $createdSession->expiration() ->willReturn($this->time + 3600); $session->expiration() ->willReturn($this->time + 3600); $session->exists() ->willReturn(false); + $unaryCall->wait() + ->willReturn(null); + $connection->deleteSessionAsync(Argument::any()) + ->willReturn($unaryCall->reveal()); if ($willDeleteSessions) { $session->delete() ->willReturn(null); } + $database->connection() + ->willReturn($connection->reveal()); $database->session(Argument::any()) ->will(function ($args) use ($session) { $session->name() @@ -573,6 +663,8 @@ private function getDatabase($shouldCreateThrowException = false, $willDeleteSes 'database' => self::DATABASE_NAME, 'instance' => self::INSTANCE_NAME ]); + $database->name() + ->willReturn(self::DATABASE_NAME); if ($shouldCreateThrowException) { $database->createSession()