Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 124 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

"""Define API Jobs."""

import threading
import warnings

import six

from google.cloud import exceptions
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -27,6 +31,7 @@
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty
import google.cloud.future.base

This comment was marked as spam.



class Compression(_EnumProperty):
Expand Down Expand Up @@ -82,16 +87,23 @@ class WriteDisposition(_EnumProperty):
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)


class _BaseJob(object):
"""Base class for jobs.
class _AsyncJob(google.cloud.future.base.PollingFuture):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, client):
def __init__(self, name, client):
super(_AsyncJob, self).__init__()
self.name = name
self._client = client
self._properties = {}
self._result_set = False
self._completion_lock = threading.Lock()

@property
def project(self):
Expand All @@ -117,21 +129,6 @@ def _require_client(self, client):
client = self._client
return client


class _AsyncJob(_BaseJob):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, client):
super(_AsyncJob, self).__init__(client)
self.name = name

@property
def job_type(self):
"""Type of job
Expand Down Expand Up @@ -273,6 +270,9 @@ def _set_properties(self, api_response):
self._properties.clear()
self._properties.update(cleaned)

# For Future interface
self._set_future_result()

@classmethod
def _get_resource_config(cls, resource):
"""Helper for :meth:`from_api_repr`
Expand Down Expand Up @@ -345,7 +345,7 @@ def exists(self, client=None):
return True

def reload(self, client=None):
"""API call: refresh job properties via a GET request
"""API call: refresh job properties via a GET request.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
Expand All @@ -371,12 +371,83 @@ def cancel(self, client=None):
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: bool
:returns: Boolean indicating that the cancel request was sent.
"""
client = self._require_client(client)

api_response = client._connection.api_request(
method='POST', path='%s/cancel' % (self.path,))
self._set_properties(api_response['job'])
return True

This comment was marked as spam.

This comment was marked as spam.


# The following methods implement the PollingFuture interface. Note that
# the methods above are from the pre-Future interface and are left for
# compatibility. The only "overloaded" method is :meth:`cancel`, which
# satisfies both interfaces.

def _set_future_result(self):
"""Set the result or exception from the job if it is complete."""
# This must be done in a lock to prevent the polling thread
# and main thread from both executing the completion logic
# at the same time.
with self._completion_lock:
# If the operation isn't complete or if the result has already been
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != 'DONE' or self._result_set:

This comment was marked as spam.

This comment was marked as spam.

return

if self.error_result is not None:
exception = exceptions.GoogleCloudError(
self.error_result, errors=self.errors)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self.set_exception(exception)
else:
self.set_result(self)

def done(self):
"""Refresh the job and checks if it is complete.

:rtype: bool
:returns: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != 'DONE':

This comment was marked as spam.

This comment was marked as spam.

self.reload()
return self.state == 'DONE'

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: _AsyncJob
:returns: This instance.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
if self.state is None:
self.begin()
return super(_AsyncJob, self).result(timeout=timeout)

def cancelled(self):
"""Check if the job has been cancelled.

This always returns False. It's not possible to check if a job was
cancelled in the API. This method is here to satisfy the interface
for :class:`google.cloud.future.Future`.

:rtype: bool
:returns: False
"""
return False


class _LoadConfiguration(object):
Expand Down Expand Up @@ -1127,11 +1198,44 @@ def from_api_repr(cls, resource, client):
job._set_properties(resource)
return job

def results(self):
def query_results(self):
"""Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)

def results(self):
"""DEPRECATED.

This comment was marked as spam.

This comment was marked as spam.


This method is deprecated. Use :meth:`query_results` or :meth:`result`.

Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
"""
warnings.warn(
'QueryJob.results() is deprecated. Please use query_results() or '
'result().', DeprecationWarning)
return self.query_results()

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return a QueryResults instance instead of returning the job.
return self.query_results()
68 changes: 66 additions & 2 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import warnings

import unittest


Expand Down Expand Up @@ -1514,15 +1517,76 @@ def test_from_api_repr_w_properties(self):
self.assertIs(dataset._client, client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_results(self):
def test_query_results(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
results = job.results()
results = job.query_results()
self.assertIsInstance(results, QueryResults)
self.assertIs(results._job, job)

def test_results_is_deprecated(self):
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)

with warnings.catch_warnings(record=True) as warned:
warnings.simplefilter('always')
job.results()
self.assertEqual(len(warned), 1)
self.assertIn('deprecated', str(warned[0]))

def test_result(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
job._properties['status'] = {'state': 'DONE'}

result = job.result()

self.assertIsInstance(result, QueryResults)
self.assertIs(result._job, job)

def test_result_invokes_begins(self):
begun_resource = self._makeResource()
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(begun_resource, done_resource)
client = _Client(self.PROJECT, connection=connection)
job = self._make_one(self.JOB_NAME, self.QUERY, client)

job.result()

self.assertEqual(len(connection._requested), 2)
begin_request, reload_request = connection._requested
self.assertEqual(begin_request['method'], 'POST')
self.assertEqual(reload_request['method'], 'GET')

def test_result_error(self):
from google.cloud import exceptions

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
error_result = {
'debugInfo': 'DEBUG',
'location': 'LOCATION',
'message': 'MESSAGE',
'reason': 'REASON'
}
job._properties['status'] = {
'errorResult': error_result,
'errors': [error_result],
'state': 'DONE'
}
job._set_future_result()

with self.assertRaises(exceptions.GoogleCloudError) as exc_info:
job.result()

self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError)
self.assertEqual(exc_info.exception.errors, [error_result])

def test_begin_w_bound_client(self):
PATH = '/projects/%s/jobs' % (self.PROJECT,)
RESOURCE = self._makeResource()
Expand Down