Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 116 additions & 102 deletions src/BigQuery/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
use Google\Cloud\BigQuery\Connection\Rest;
use Google\Cloud\BigQuery\Exception\JobException;
use Google\Cloud\BigQuery\Job;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\ClientTrait;
use Google\Cloud\Core\ExponentialBackoff;
use Google\Cloud\Core\Int64;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\Iterator\PageIterator;
use Google\Cloud\Core\RetryDeciderTrait;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Http\Message\StreamInterface;

Expand All @@ -43,11 +44,14 @@
*/
class BigQueryClient
{
use ArrayTrait;
use ClientTrait;
use JobConfigurationTrait;
use RetryDeciderTrait;

const VERSION = '0.2.2';

const MAX_DELAY_MICROSECONDS = 32000000;

const SCOPE = 'https://www.googleapis.com/auth/bigquery';
const INSERT_SCOPE = 'https://www.googleapis.com/auth/bigquery.insertdata';

Expand All @@ -57,7 +61,7 @@ class BigQueryClient
protected $connection;

/**
* @var ValueMapper $mapper Maps values between PHP and BigQuery.
* @var ValueMapper Maps values between PHP and BigQuery.
*/
private $mapper;

Expand Down Expand Up @@ -96,15 +100,85 @@ class BigQueryClient
*/
public function __construct(array $config = [])
{
$this->setHttpRetryCodes([]);
$this->setHttpRetryMessages([
'rateLimitExceeded',
'backendError'
]);
$config += [
'scopes' => [self::SCOPE],
'returnInt64AsObject' => false
'returnInt64AsObject' => false,
'restRetryFunction' => $this->getRetryFunction(),
'restDelayFunction' => function ($attempt) {
return min(
mt_rand(0, 1000000) + (pow(2, $attempt) * 1000000),
self::MAX_DELAY_MICROSECONDS
);
}
];

$this->connection = new Rest($this->configureAuthentication($config));
$this->mapper = new ValueMapper($config['returnInt64AsObject']);
}

/**
* Returns a job configuration to be passed to either
* {@see Google\Cloud\BigQuery\BigQueryClient::runQuery()} or
* {@see Google\Cloud\BigQuery\BigQueryClient::startQuery()}. A
* configuration can be built using fluent setters or by providing a full
* set of options at once.
*
* Unless otherwise specified, all configuration options will default based
* on the [Jobs configuration API documentation]
* (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration)
* except for `configuration.query.useLegacySql`, which defaults to `false`
* in this client.
*
* Example:
* ```
* $queryJobConfig = $bigQuery->query(
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
* );
* ```
*
* ```
* // Set create disposition using fluent setters.
* $queryJobConfig = $bigQuery->query(
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
* )->createDisposition('CREATE_NEVER');
* ```
*
* ```
* // This is equivalent to the above example, using array configuration
* // instead of fluent setters.
* $queryJobConfig = $bigQuery->query(
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100',
* [
* 'configuration' => [
* 'query' => [
* 'createDisposition' => 'CREATE_NEVER'
* ]
* ]
* ]
* );
* ```
*
* @param string $query A BigQuery SQL query.
* @param array $options [optional] Please see the
* [API documentation for Job configuration]
* (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration)
* for the available options.
* @return QueryJobConfiguration
*/
public function query($query, array $options = [])
{
return (new QueryJobConfiguration(
$this->mapper,
$this->projectId,
$options
))->query($query);
}

/**
* Runs a BigQuery SQL query in a synchronous fashion. Rows are returned
* immediately as long as the query completes within a specified timeout. In
Expand Down Expand Up @@ -137,7 +211,10 @@ public function __construct(array $config = [])
*
* Example:
* ```
* $queryResults = $bigQuery->runQuery('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
* $queryJobConfig = $bigQuery->query(
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
* );
* $queryResults = $bigQuery->runQuery($queryJobConfig);
*
* foreach ($queryResults as $row) {
* echo $row['commit'];
Expand All @@ -148,12 +225,12 @@ public function __construct(array $config = [])
* // Construct a query utilizing named parameters.
* $query = 'SELECT commit FROM `bigquery-public-data.github_repos.commits`' .
* 'WHERE author.date < @date AND message = @message LIMIT 100';
* $queryResults = $bigQuery->runQuery($query, [
* 'parameters' => [
* $queryJobConfig = $bigQuery->query($query)
* ->parameters([
* 'date' => $bigQuery->timestamp(new \DateTime('1980-01-01 12:15:00Z')),
* 'message' => 'A commit message.'
* ]
* ]);
* ]);
* $queryResults = $bigQuery->runQuery($queryJobConfig);
*
* foreach ($queryResults as $row) {
* echo $row['commit'];
Expand All @@ -163,9 +240,9 @@ public function __construct(array $config = [])
* ```
* // Construct a query utilizing positional parameters.
* $query = 'SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100';
* $queryResults = $bigQuery->runQuery($query, [
* 'parameters' => ['A commit message.']
* ]);
* $queryJobConfig = $bigQuery->query($query)
* ->parameters(['A commit message.']);
* $queryResults = $bigQuery->runQuery($queryJobConfig);
*
* foreach ($queryResults as $row) {
* echo $row['commit'];
Expand All @@ -174,7 +251,7 @@ public function __construct(array $config = [])
*
* @see https://cloud.google.com/bigquery/docs/reference/v2/jobs/query Query API documentation.
*
* @param string $query A BigQuery SQL query.
* @param QueryJobConfiguration $query A BigQuery SQL query configuration.
* @param array $options [optional] {
* Configuration options.
*
Expand All @@ -187,36 +264,23 @@ public function __construct(array $config = [])
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* @type array $parameters Only available for standard SQL queries.
* When providing a non-associative array positional parameters
* (`?`) will be used. When providing an associative array
* named parameters will be used (`@name`).
* @type array $jobConfig Configuration settings for a query job are
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
* If not provided default settings will be used, with the exception
* of `configuration.query.useLegacySql`, which defaults to `false`
* in this client.
* }
* @return QueryResults
* @throws JobException If the maximum number of retries while waiting for
* query completion has been exceeded.
*/
public function runQuery($query, array $options = [])
public function runQuery(JobConfigurationInterface $query, array $options = [])
{
$jobOptions = $this->pluckArray([
'parameters',
'jobConfig'
], $options);
$queryResultsOptions = $this->pluckArray([
'maxResults',
'startIndex',
'timeoutMs',
'maxRetries'
], $options);

return $this->runQueryAsJob(
return $this->startQuery(
$query,
$jobOptions + $options
$options
)->queryResults($queryResultsOptions + $options);
}

Expand All @@ -230,7 +294,10 @@ public function runQuery($query, array $options = [])
*
* Example:
* ```
* $job = $bigQuery->runQueryAsJob('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
* $queryJobConfig = $bigQuery->query(
* 'SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100'
* );
* $job = $bigQuery->startQuery($queryJobConfig);
* $queryResults = $job->queryResults();
*
* foreach ($queryResults as $row) {
Expand All @@ -240,52 +307,18 @@ public function runQuery($query, array $options = [])
*
* @see https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert Jobs insert API documentation.
*
* @param string $query A BigQuery SQL query.
* @param array $options [optional] {
* Configuration options.
*
* @type array $parameters Only available for standard SQL queries.
* When providing a non-associative array positional parameters
* (`?`) will be used. When providing an associative array
* named parameters will be used (`@name`).
* @type array $jobConfig Configuration settings for a query job are
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
* If not provided default settings will be used, with the exception
* of `configuration.query.useLegacySql`, which defaults to `false`
* in this client.
* @type string $jobIdPrefix If given, the returned job ID will be of
* format `{$jobIdPrefix-}{jobId}`. **Defaults to** `null`.
* }
* @param QueryJobConfiguration $query A BigQuery SQL query configuration.
* @param array $options [optional] Configuration options.
* @return Job
*/
public function runQueryAsJob($query, array $options = [])
public function startQuery(JobConfigurationInterface $query, array $options = [])
{
$options += [
'jobConfig' => []
];

if (isset($options['parameters'])) {
$options['jobConfig'] += $this->formatQueryParameters($options['parameters']);

unset($options['parameters']);
}

$options['jobConfig'] += [
'useLegacySql' => false
];

$config = $this->buildJobConfig(
'query',
$this->projectId,
['query' => $query],
$options
);

$response = $this->connection->insertJob($config);
$config = $query->toArray();
$response = $this->connection->insertJob($config + $options);

return new Job(
$this->connection,
$response['jobReference']['jobId'],
$config['jobReference']['jobId'],
$this->projectId,
$this->mapper,
$response
Expand All @@ -302,7 +335,7 @@ public function runQueryAsJob($query, array $options = [])
* $job = $bigQuery->job('myJobId');
* ```
*
* @param string $id The id of the job to request.
* @param string $id The id of the already run or running job to request.
* @return Job
*/
public function job($id)
Expand Down Expand Up @@ -445,6 +478,9 @@ function (array $dataset) {
/**
* Creates a dataset.
*
* Please note that by default the library will not attempt to retry this
* call on your behalf.
*
* Example:
* ```
* $dataset = $bigQuery->createDataset('aDataset');
Expand All @@ -469,12 +505,16 @@ public function createDataset($id, array $options = [])
unset($options['metadata']);
}

$response = $this->connection->insertDataset([
'projectId' => $this->projectId,
'datasetReference' => [
'datasetId' => $id
$response = $this->connection->insertDataset(
[
'projectId' => $this->projectId,
'datasetReference' => [
'datasetId' => $id
]
]
] + $options);
+ $options
+ ['retries' => 0]
);

return new Dataset(
$this->connection,
Expand Down Expand Up @@ -565,30 +605,4 @@ public function timestamp(\DateTimeInterface $value)
{
return new Timestamp($value);
}

/**
* Formats query parameters for the API.
*
* @param array $parameters The parameters to format.
* @return array
*/
private function formatQueryParameters(array $parameters)
{
$options = [
'parameterMode' => $this->isAssoc($parameters) ? 'named' : 'positional',
'useLegacySql' => false
];

foreach ($parameters as $name => $value) {
$param = $this->mapper->toParameter($value);

if ($options['parameterMode'] === 'named') {
$param += ['name' => $name];
}

$options['queryParameters'][] = $param;
}

return $options;
}
}
13 changes: 10 additions & 3 deletions src/BigQuery/Connection/Rest.php
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,19 @@ private function resolveUploadOptions(array $args)
$args += [
'projectId' => null,
'data' => null,
'configuration' => []
'configuration' => [],
'labels' => [],
'dryRun' => false,
'jobReference' => []
];

$args['data'] = Psr7\stream_for($args['data']);
$args['metadata']['configuration'] = $args['configuration'];
unset($args['configuration']);
$args['metadata'] = $this->pluckArray([
'labels',
'dryRun',
'jobReference',
'configuration'
], $args);

$uploaderOptionKeys = [
'restOptions',
Expand Down
Loading