diff --git a/src/BigQuery/BigQueryClient.php b/src/BigQuery/BigQueryClient.php index cfa25d9e09a6..598f0f1f66fd 100644 --- a/src/BigQuery/BigQueryClient.php +++ b/src/BigQuery/BigQueryClient.php @@ -19,11 +19,13 @@ use Google\Cloud\BigQuery\Connection\ConnectionInterface; use Google\Cloud\BigQuery\Connection\Rest; +use Google\Cloud\BigQuery\Job; use Google\Cloud\Core\ArrayTrait; -use Google\Cloud\Core\Iterator\ItemIterator; -use Google\Cloud\Core\Iterator\PageIterator; use Google\Cloud\Core\ClientTrait; +use Google\Cloud\Core\ExponentialBackoff; use Google\Cloud\Core\Int64; +use Google\Cloud\Core\Iterator\ItemIterator; +use Google\Cloud\Core\Iterator\PageIterator; use Psr\Cache\CacheItemPoolInterface; use Psr\Http\Message\StreamInterface; @@ -138,14 +140,6 @@ public function __construct(array $config = []) * ``` * $queryResults = $bigQuery->runQuery('SELECT commit FROM [bigquery-public-data:github_repos.commits] LIMIT 100'); * - * $isComplete = $queryResults->isComplete(); - * - * while (!$isComplete) { - * sleep(1); // let's wait for a moment... - * $queryResults->reload(); // trigger a network request - * $isComplete = $queryResults->isComplete(); // check the query's status - * } - * * foreach ($queryResults->rows() as $row) { * echo $row['commit']; * } @@ -162,14 +156,6 @@ public function __construct(array $config = []) * ] * ]); * - * $isComplete = $queryResults->isComplete(); - * - * while (!$isComplete) { - * sleep(1); // let's wait for a moment... - * $queryResults->reload(); // trigger a network request - * $isComplete = $queryResults->isComplete(); // check the query's status - * } - * * foreach ($queryResults->rows() as $row) { * echo $row['commit']; * } @@ -182,14 +168,6 @@ public function __construct(array $config = []) * 'parameters' => ['A commit message.'] * ]); * - * $isComplete = $queryResults->isComplete(); - * - * while (!$isComplete) { - * sleep(1); // let's wait for a moment... - * $queryResults->reload(); // trigger a network request - * $isComplete = $queryResults->isComplete(); // check the query's status - * } - * * foreach ($queryResults->rows() as $row) { * echo $row['commit']; * } @@ -211,6 +189,8 @@ public function __construct(array $config = []) * qualified in the format 'datasetId.tableId'. * @type int $timeoutMs How long to wait for the query to complete, in * milliseconds. **Defaults to** `10000` milliseconds (10 seconds). + * @type int $maxRetries The number of times to retry, checking if the + * query has completed. **Defaults to** `100`. * @type bool $useQueryCache Whether to look for the result in the query * cache. * @type bool $useLegacySql Specifies whether to use BigQuery's legacy @@ -223,20 +203,29 @@ public function __construct(array $config = []) * named parameters will be used (`@name`). * } * @return QueryResults + * @throws \RuntimeException if the maximum number of retries while waiting + * for query completion has been exceeded. */ public function runQuery($query, array $options = []) { + $options += [ + 'maxRetries' => 100 + ]; + if (isset($options['parameters'])) { $options += $this->formatQueryParameters($options['parameters']); unset($options['parameters']); } + $queryOptions = $options; + unset($queryOptions['timeoutMs'], $queryOptions['maxRetries']); + $response = $this->connection->query([ 'projectId' => $this->projectId, 'query' => $query - ] + $options); + ] + $queryOptions); - return new QueryResults( + $results = new QueryResults( $this->connection, $response['jobReference']['jobId'], $this->projectId, @@ -244,6 +233,21 @@ public function runQuery($query, array $options = []) $options, $this->mapper ); + + if (!$results->isComplete()) { + $retryFn = function (QueryResults $results, array $options) { + $results->reload($options); + + if (!$results->isComplete()) { + throw new \RuntimeException('Job did not complete within the allowed number of retries.'); + } + }; + + $retry = new ExponentialBackoff($options['maxRetries']); + $retry->execute($retryFn, [$results, $options]); + } + + return $results; } /** diff --git a/tests/snippets/BigQuery/BigQueryClientTest.php b/tests/snippets/BigQuery/BigQueryClientTest.php index b6527511ef23..0f9693fab76e 100644 --- a/tests/snippets/BigQuery/BigQueryClientTest.php +++ b/tests/snippets/BigQuery/BigQueryClientTest.php @@ -79,7 +79,6 @@ public function testRunQuery() { $snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery'); $snippet->addLocal('bigQuery', $this->client); - $snippet->replace('sleep(1);', ''); $this->connection->query(Argument::any()) ->shouldBeCalled() ->willReturn([ @@ -102,7 +101,6 @@ public function testRunQueryWithNamedParameters() { $snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 1); $snippet->addLocal('bigQuery', $this->client); - $snippet->replace('sleep(1);', ''); $this->connection ->query(Argument::withEntry('queryParameters', [ [ @@ -145,7 +143,6 @@ public function testRunQueryWithPositionalParameters() { $snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 2); $snippet->addLocal('bigQuery', $this->client); - $snippet->replace('sleep(1);', ''); $this->connection ->query(Argument::withEntry('queryParameters', [ [ diff --git a/tests/unit/BigQuery/BigQueryClientTest.php b/tests/unit/BigQuery/BigQueryClientTest.php index 190691fffd9a..6fddc4bb132f 100644 --- a/tests/unit/BigQuery/BigQueryClientTest.php +++ b/tests/unit/BigQuery/BigQueryClientTest.php @@ -26,6 +26,7 @@ use Google\Cloud\BigQuery\QueryResults; use Google\Cloud\BigQuery\Time; use Google\Cloud\BigQuery\Timestamp; +use Google\Cloud\Core\Iterator\ItemIterator; use Prophecy\Argument; /** @@ -54,9 +55,40 @@ public function testRunsQuery($query, $options, $expected) ->willReturn([ 'jobReference' => [ 'jobId' => $this->jobId - ] + ], + 'jobComplete' => true + ]) + ->shouldBeCalledTimes(1); + $this->client->setConnection($this->connection->reveal()); + $queryResults = $this->client->runQuery($query, $options); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertEquals($this->jobId, $queryResults->identity()['jobId']); + } + + /** + * @dataProvider queryDataProvider + */ + public function testRunsQueryWithRetry($query, $options, $expected) + { + $this->connection->query($expected) + ->willReturn([ + 'jobReference' => [ + 'jobId' => $this->jobId + ], + 'jobComplete' => false + ]) + ->shouldBeCalledTimes(1); + + $this->connection->getQueryResults(Argument::any()) + ->willReturn([ + 'jobReference' => [ + 'jobId' => $this->jobId + ], + 'jobComplete' => true ]) ->shouldBeCalledTimes(1); + $this->client->setConnection($this->connection->reveal()); $queryResults = $this->client->runQuery($query, $options);