diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 4f791bdbea0c..35a423b755b9 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -14,8 +14,14 @@ """Define API Jobs.""" +import collections +import threading +import warnings + import six +from six.moves import http_client +from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud._helpers import _datetime_from_microseconds from google.cloud.bigquery.dataset import Dataset @@ -27,6 +33,60 @@ from google.cloud.bigquery._helpers import UDFResourcesProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _TypedProperty +import google.cloud.future.base + +_DONE_STATE = 'DONE' +_STOPPED_REASON = 'stopped' + +_ERROR_REASON_TO_EXCEPTION = { + 'accessDenied': http_client.FORBIDDEN, + 'backendError': http_client.INTERNAL_SERVER_ERROR, + 'billingNotEnabled': http_client.FORBIDDEN, + 'billingTierLimitExceeded': http_client.BAD_REQUEST, + 'blocked': http_client.FORBIDDEN, + 'duplicate': http_client.CONFLICT, + 'internalError': http_client.INTERNAL_SERVER_ERROR, + 'invalid': http_client.BAD_REQUEST, + 'invalidQuery': http_client.BAD_REQUEST, + 'notFound': http_client.NOT_FOUND, + 'notImplemented': http_client.NOT_IMPLEMENTED, + 'quotaExceeded': http_client.FORBIDDEN, + 'rateLimitExceeded': http_client.FORBIDDEN, + 'resourceInUse': http_client.BAD_REQUEST, + 'resourcesExceeded': http_client.BAD_REQUEST, + 'responseTooLarge': http_client.FORBIDDEN, + 'stopped': http_client.OK, + 'tableUnavailable': http_client.BAD_REQUEST, +} + +_FakeResponse = collections.namedtuple('_FakeResponse', ['status']) + + +def _error_result_to_exception(error_result): + """Maps BigQuery error reasons to an exception. + + The reasons and their matching HTTP status codes are documented on + the `troubleshooting errors`_ page. + + .. _troubleshooting errors: https://cloud.google.com/bigquery\ + /troubleshooting-errors + + :type error_result: Mapping[str, str] + :param error_result: The error result from BigQuery. + + :rtype google.cloud.exceptions.GoogleCloudError: + :returns: The mapped exception. + """ + reason = error_result.get('reason') + status_code = _ERROR_REASON_TO_EXCEPTION.get( + reason, http_client.INTERNAL_SERVER_ERROR) + # make_exception expects an httplib2 response object. + fake_response = _FakeResponse(status=status_code) + return exceptions.make_exception( + fake_response, + error_result.get('message', ''), + error_info=error_result, + use_json=False) class Compression(_EnumProperty): @@ -82,16 +142,23 @@ class WriteDisposition(_EnumProperty): ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) -class _BaseJob(object): - """Base class for jobs. +class _AsyncJob(google.cloud.future.base.PollingFuture): + """Base class for asynchronous jobs. + + :type name: str + :param name: the name of the job :type client: :class:`google.cloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). """ - def __init__(self, client): + def __init__(self, name, client): + super(_AsyncJob, self).__init__() + self.name = name self._client = client self._properties = {} + self._result_set = False + self._completion_lock = threading.Lock() @property def project(self): @@ -117,21 +184,6 @@ def _require_client(self, client): client = self._client return client - -class _AsyncJob(_BaseJob): - """Base class for asynchronous jobs. - - :type name: str - :param name: the name of the job - - :type client: :class:`google.cloud.bigquery.client.Client` - :param client: A client which holds credentials and project configuration - for the dataset (which requires a project). - """ - def __init__(self, name, client): - super(_AsyncJob, self).__init__(client) - self.name = name - @property def job_type(self): """Type of job @@ -273,6 +325,9 @@ def _set_properties(self, api_response): self._properties.clear() self._properties.update(cleaned) + # For Future interface + self._set_future_result() + @classmethod def _get_resource_config(cls, resource): """Helper for :meth:`from_api_repr` @@ -345,7 +400,7 @@ def exists(self, client=None): return True def reload(self, client=None): - """API call: refresh job properties via a GET request + """API call: refresh job properties via a GET request. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get @@ -371,12 +426,85 @@ def cancel(self, client=None): ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + + :rtype: bool + :returns: Boolean indicating that the cancel request was sent. """ client = self._require_client(client) api_response = client._connection.api_request( method='POST', path='%s/cancel' % (self.path,)) self._set_properties(api_response['job']) + # The Future interface requires that we return True if the *attempt* + # to cancel was successful. + return True + + # The following methods implement the PollingFuture interface. Note that + # the methods above are from the pre-Future interface and are left for + # compatibility. The only "overloaded" method is :meth:`cancel`, which + # satisfies both interfaces. + + def _set_future_result(self): + """Set the result or exception from the job if it is complete.""" + # This must be done in a lock to prevent the polling thread + # and main thread from both executing the completion logic + # at the same time. + with self._completion_lock: + # If the operation isn't complete or if the result has already been + # set, do not call set_result/set_exception again. + # Note: self._result_set is set to True in set_result and + # set_exception, in case those methods are invoked directly. + if self.state != _DONE_STATE or self._result_set: + return + + if self.error_result is not None: + exception = _error_result_to_exception(self.error_result) + self.set_exception(exception) + else: + self.set_result(self) + + def done(self): + """Refresh the job and checks if it is complete. + + :rtype: bool + :returns: True if the job is complete, False otherwise. + """ + # Do not refresh is the state is already done, as the job will not + # change once complete. + if self.state != _DONE_STATE: + self.reload() + return self.state == _DONE_STATE + + def result(self, timeout=None): + """Start the job and wait for it to complete and get the result. + + :type timeout: int + :param timeout: How long to wait for job to complete before raising + a :class:`TimeoutError`. + + :rtype: _AsyncJob + :returns: This instance. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job + failed or :class:`TimeoutError` if the job did not complete in the + given timeout. + """ + if self.state is None: + self.begin() + return super(_AsyncJob, self).result(timeout=timeout) + + def cancelled(self): + """Check if the job has been cancelled. + + This always returns False. It's not possible to check if a job was + cancelled in the API. This method is here to satisfy the interface + for :class:`google.cloud.future.Future`. + + :rtype: bool + :returns: False + """ + return (self.error_result is not None + and self.error_result.get('reason') == _STOPPED_REASON) class _LoadConfiguration(object): @@ -1127,7 +1255,7 @@ def from_api_repr(cls, resource, client): job._set_properties(resource) return job - def results(self): + def query_results(self): """Construct a QueryResults instance, bound to this job. :rtype: :class:`~google.cloud.bigquery.query.QueryResults` @@ -1135,3 +1263,36 @@ def results(self): """ from google.cloud.bigquery.query import QueryResults return QueryResults.from_query_job(self) + + def results(self): + """DEPRECATED. + + This method is deprecated. Use :meth:`query_results` or :meth:`result`. + + Construct a QueryResults instance, bound to this job. + + :rtype: :class:`~google.cloud.bigquery.query.QueryResults` + :returns: The query results. + """ + warnings.warn( + 'QueryJob.results() is deprecated. Please use query_results() or ' + 'result().', DeprecationWarning) + return self.query_results() + + def result(self, timeout=None): + """Start the job and wait for it to complete and get the result. + + :type timeout: int + :param timeout: How long to wait for job to complete before raising + a :class:`TimeoutError`. + + :rtype: :class:`~google.cloud.bigquery.query.QueryResults` + :returns: The query results. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job + failed or :class:`TimeoutError` if the job did not complete in the + given timeout. + """ + super(QueryJob, self).result(timeout=timeout) + # Return a QueryResults instance instead of returning the job. + return self.query_results() diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 3391ec2bd2d8..1d3da3d2a83d 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -19,6 +19,7 @@ import os import time import unittest +import uuid from google.cloud import bigquery from google.cloud._helpers import UTC @@ -1013,6 +1014,15 @@ def test_large_query_w_public_data(self): rows = list(iterator) self.assertEqual(len(rows), LIMIT) + def test_async_query_future(self): + query_job = Config.CLIENT.run_async_query( + str(uuid.uuid4()), 'SELECT 1') + query_job.use_legacy_sql = False + + iterator = query_job.result().fetch_data() + rows = list(iterator) + self.assertEqual(rows, [(1,)]) + def test_insert_nested_nested(self): # See #2951 SF = bigquery.SchemaField diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 57d96bf8ae15..8b9d079df148 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -12,9 +12,34 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy +import warnings + +from six.moves import http_client import unittest +class Test__error_result_to_exception(unittest.TestCase): + def _call_fut(self, *args, **kwargs): + from google.cloud.bigquery import job + return job._error_result_to_exception(*args, **kwargs) + + def test_simple(self): + error_result = { + 'reason': 'invalid', + 'message': 'bad request' + } + exception = self._call_fut(error_result) + self.assertEqual(exception.code, http_client.BAD_REQUEST) + self.assertTrue(exception.message.startswith('bad request')) + self.assertIn("'reason': 'invalid'", exception.message) + + def test_missing_reason(self): + error_result = {} + exception = self._call_fut(error_result) + self.assertEqual(exception.code, http_client.INTERNAL_SERVER_ERROR) + + class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' @@ -1514,15 +1539,88 @@ def test_from_api_repr_w_properties(self): self.assertIs(dataset._client, client) self._verifyResourceProperties(dataset, RESOURCE) - def test_results(self): + def test_cancelled(self): + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + job._properties['status'] = { + 'state': 'DONE', + 'errorResult': { + 'reason': 'stopped' + } + } + + self.assertTrue(job.cancelled()) + + def test_query_results(self): from google.cloud.bigquery.query import QueryResults client = _Client(self.PROJECT) job = self._make_one(self.JOB_NAME, self.QUERY, client) - results = job.results() + results = job.query_results() self.assertIsInstance(results, QueryResults) self.assertIs(results._job, job) + def test_results_is_deprecated(self): + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + + with warnings.catch_warnings(record=True) as warned: + warnings.simplefilter('always') + job.results() + self.assertEqual(len(warned), 1) + self.assertIn('deprecated', str(warned[0])) + + def test_result(self): + from google.cloud.bigquery.query import QueryResults + + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + job._properties['status'] = {'state': 'DONE'} + + result = job.result() + + self.assertIsInstance(result, QueryResults) + self.assertIs(result._job, job) + + def test_result_invokes_begins(self): + begun_resource = self._makeResource() + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection(begun_resource, done_resource) + client = _Client(self.PROJECT, connection=connection) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + + job.result() + + self.assertEqual(len(connection._requested), 2) + begin_request, reload_request = connection._requested + self.assertEqual(begin_request['method'], 'POST') + self.assertEqual(reload_request['method'], 'GET') + + def test_result_error(self): + from google.cloud import exceptions + + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + error_result = { + 'debugInfo': 'DEBUG', + 'location': 'LOCATION', + 'message': 'MESSAGE', + 'reason': 'invalid' + } + job._properties['status'] = { + 'errorResult': error_result, + 'errors': [error_result], + 'state': 'DONE' + } + job._set_future_result() + + with self.assertRaises(exceptions.GoogleCloudError) as exc_info: + job.result() + + self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError) + self.assertEqual(exc_info.exception.code, http_client.BAD_REQUEST) + def test_begin_w_bound_client(self): PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource()