Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions src/BigQuery/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

use Google\Cloud\BigQuery\Connection\ConnectionInterface;
use Google\Cloud\BigQuery\Connection\Rest;
use Google\Cloud\BigQuery\Job;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\Iterator\PageIterator;
use Google\Cloud\Core\ClientTrait;
use Google\Cloud\Core\ExponentialBackoff;
use Google\Cloud\Core\Int64;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\Iterator\PageIterator;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Http\Message\StreamInterface;

Expand Down Expand Up @@ -138,14 +140,6 @@ public function __construct(array $config = [])
* ```
* $queryResults = $bigQuery->runQuery('SELECT commit FROM [bigquery-public-data:github_repos.commits] LIMIT 100');
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -162,14 +156,6 @@ public function __construct(array $config = [])
* ]
* ]);
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -182,14 +168,6 @@ public function __construct(array $config = [])
* 'parameters' => ['A commit message.']
* ]);
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -211,6 +189,8 @@ public function __construct(array $config = [])
* qualified in the format 'datasetId.tableId'.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* @type bool $useQueryCache Whether to look for the result in the query
* cache.
* @type bool $useLegacySql Specifies whether to use BigQuery's legacy
Expand All @@ -223,27 +203,51 @@ public function __construct(array $config = [])
* named parameters will be used (`@name`).
* }
* @return QueryResults
* @throws \RuntimeException if the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function runQuery($query, array $options = [])
{
$options += [
'maxRetries' => 100
];

if (isset($options['parameters'])) {
$options += $this->formatQueryParameters($options['parameters']);
unset($options['parameters']);
}

$queryOptions = $options;
unset($queryOptions['timeoutMs'], $queryOptions['maxRetries']);

$response = $this->connection->query([
'projectId' => $this->projectId,
'query' => $query
] + $options);
] + $queryOptions);

return new QueryResults(
$results = new QueryResults(
$this->connection,
$response['jobReference']['jobId'],
$this->projectId,
$response,
$options,
$this->mapper
);

if (!$results->isComplete()) {
$retryFn = function (QueryResults $results, array $options) {
$results->reload($options);

if (!$results->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($options['maxRetries']);
$retry->execute($retryFn, [$results, $options]);
}

return $results;
}

/**
Expand Down
3 changes: 0 additions & 3 deletions tests/snippets/BigQuery/BigQueryClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public function testRunQuery()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery');
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection->query(Argument::any())
->shouldBeCalled()
->willReturn([
Expand All @@ -102,7 +101,6 @@ public function testRunQueryWithNamedParameters()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 1);
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection
->query(Argument::withEntry('queryParameters', [
[
Expand Down Expand Up @@ -145,7 +143,6 @@ public function testRunQueryWithPositionalParameters()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 2);
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection
->query(Argument::withEntry('queryParameters', [
[
Expand Down
34 changes: 33 additions & 1 deletion tests/unit/BigQuery/BigQueryClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Google\Cloud\BigQuery\QueryResults;
use Google\Cloud\BigQuery\Time;
use Google\Cloud\BigQuery\Timestamp;
use Google\Cloud\Core\Iterator\ItemIterator;
use Prophecy\Argument;

/**
Expand Down Expand Up @@ -54,9 +55,40 @@ public function testRunsQuery($query, $options, $expected)
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
]
],
'jobComplete' => true
])
->shouldBeCalledTimes(1);
$this->client->setConnection($this->connection->reveal());
$queryResults = $this->client->runQuery($query, $options);

$this->assertInstanceOf(QueryResults::class, $queryResults);
$this->assertEquals($this->jobId, $queryResults->identity()['jobId']);
}

/**
* @dataProvider queryDataProvider
*/
public function testRunsQueryWithRetry($query, $options, $expected)
{
$this->connection->query($expected)
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
],
'jobComplete' => false
])
->shouldBeCalledTimes(1);

$this->connection->getQueryResults(Argument::any())
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
],
'jobComplete' => true
])
->shouldBeCalledTimes(1);

$this->client->setConnection($this->connection->reveal());
$queryResults = $this->client->runQuery($query, $options);

Expand Down