diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 3e6a9f93418b..ef5353f9ff14 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -32,7 +32,7 @@ from google.cloud.bigquery._helpers import UDFResourcesProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _TypedProperty -import google.cloud.future.base +import google.cloud.future.polling _DONE_STATE = 'DONE' _STOPPED_REASON = 'stopped' @@ -141,7 +141,7 @@ class WriteDisposition(_EnumProperty): ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) -class _AsyncJob(google.cloud.future.base.PollingFuture): +class _AsyncJob(google.cloud.future.polling.PollingFuture): """Base class for asynchronous jobs. :type name: str diff --git a/core/google/cloud/future/base.py b/core/google/cloud/future/base.py index aed1dfd80e5d..243913640d62 100644 --- a/core/google/cloud/future/base.py +++ b/core/google/cloud/future/base.py @@ -15,14 +15,8 @@ """Abstract and helper bases for Future implementations.""" import abc -import concurrent.futures -import functools -import operator import six -import tenacity - -from google.cloud.future import _helpers @six.add_metaclass(abc.ABCMeta) @@ -71,146 +65,3 @@ def set_result(self, result): @abc.abstractmethod def set_exception(self, exception): raise NotImplementedError() - - -class PollingFuture(Future): - """A Future that needs to poll some service to check its status. - - The :meth:`done` method should be implemented by subclasses. The polling - behavior will repeatedly call ``done`` until it returns True. - - .. note: Privacy here is intended to prevent the final class from - overexposing, not to prevent subclasses from accessing methods. - """ - def __init__(self): - super(PollingFuture, self).__init__() - self._result = None - self._exception = None - self._result_set = False - """bool: Set to True when the result has been set via set_result or - set_exception.""" - self._polling_thread = None - self._done_callbacks = [] - - @abc.abstractmethod - def done(self): - """Checks to see if the operation is complete. - - Returns: - bool: True if the operation is complete, False otherwise. - """ - # pylint: disable=redundant-returns-doc, missing-raises-doc - raise NotImplementedError() - - def running(self): - """True if the operation is currently running.""" - return not self.done() - - def _blocking_poll(self, timeout=None): - """Poll and wait for the Future to be resolved. - - Args: - timeout (int): How long to wait for the operation to complete. - If None, wait indefinitely. - """ - if self._result_set: - return - - retry_on = tenacity.retry_if_result( - functools.partial(operator.is_not, True)) - # Use exponential backoff with jitter. - wait_on = ( - tenacity.wait_exponential(multiplier=1, max=10) + - tenacity.wait_random(0, 1)) - - if timeout is None: - retry = tenacity.retry(retry=retry_on, wait=wait_on) - else: - retry = tenacity.retry( - retry=retry_on, - wait=wait_on, - stop=tenacity.stop_after_delay(timeout)) - - try: - retry(self.done)() - except tenacity.RetryError as exc: - six.raise_from( - concurrent.futures.TimeoutError( - 'Operation did not complete within the designated ' - 'timeout.'), - exc) - - def result(self, timeout=None): - """Get the result of the operation, blocking if necessary. - - Args: - timeout (int): How long to wait for the operation to complete. - If None, wait indefinitely. - - Returns: - google.protobuf.Message: The Operation's result. - - Raises: - google.gax.GaxError: If the operation errors or if the timeout is - reached before the operation completes. - """ - self._blocking_poll(timeout=timeout) - - if self._exception is not None: - # pylint: disable=raising-bad-type - # Pylint doesn't recognize that this is valid in this case. - raise self._exception - - return self._result - - def exception(self, timeout=None): - """Get the exception from the operation, blocking if necessary. - - Args: - timeout (int): How long to wait for the operation to complete. - If None, wait indefinitely. - - Returns: - Optional[google.gax.GaxError]: The operation's error. - """ - self._blocking_poll() - return self._exception - - def add_done_callback(self, fn): - """Add a callback to be executed when the operation is complete. - - If the operation is not already complete, this will start a helper - thread to poll for the status of the operation in the background. - - Args: - fn (Callable[Future]): The callback to execute when the operation - is complete. - """ - if self._result_set: - _helpers.safe_invoke_callback(fn, self) - return - - self._done_callbacks.append(fn) - - if self._polling_thread is None: - # The polling thread will exit on its own as soon as the operation - # is done. - self._polling_thread = _helpers.start_daemon_thread( - target=self._blocking_poll) - - def _invoke_callbacks(self, *args, **kwargs): - """Invoke all done callbacks.""" - for callback in self._done_callbacks: - _helpers.safe_invoke_callback(callback, *args, **kwargs) - - def set_result(self, result): - """Set the Future's result.""" - self._result = result - self._result_set = True - self._invoke_callbacks(self) - - def set_exception(self, exception): - """Set the Future's exception.""" - self._exception = exception - self._result_set = True - self._invoke_callbacks(self) diff --git a/core/google/cloud/future/operation.py b/core/google/cloud/future/operation.py index 8064e5c13e1f..21da738ca0ff 100644 --- a/core/google/cloud/future/operation.py +++ b/core/google/cloud/future/operation.py @@ -23,10 +23,10 @@ from google.cloud import _helpers from google.cloud import exceptions -from google.cloud.future import base +from google.cloud.future import polling -class Operation(base.PollingFuture): +class Operation(polling.PollingFuture): """A Future for interacting with a Google API Long-Running Operation. Args: diff --git a/core/google/cloud/future/polling.py b/core/google/cloud/future/polling.py new file mode 100644 index 000000000000..6b7ae4221f64 --- /dev/null +++ b/core/google/cloud/future/polling.py @@ -0,0 +1,169 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Abstract and helper bases for Future implementations.""" + +import abc +import concurrent.futures +import functools +import operator + +import six +import tenacity + +from google.cloud.future import _helpers +from google.cloud.future import base + + +class PollingFuture(base.Future): + """A Future that needs to poll some service to check its status. + + The :meth:`done` method should be implemented by subclasses. The polling + behavior will repeatedly call ``done`` until it returns True. + + .. note: Privacy here is intended to prevent the final class from + overexposing, not to prevent subclasses from accessing methods. + """ + def __init__(self): + super(PollingFuture, self).__init__() + self._result = None + self._exception = None + self._result_set = False + """bool: Set to True when the result has been set via set_result or + set_exception.""" + self._polling_thread = None + self._done_callbacks = [] + + @abc.abstractmethod + def done(self): + """Checks to see if the operation is complete. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + # pylint: disable=redundant-returns-doc, missing-raises-doc + raise NotImplementedError() + + def running(self): + """True if the operation is currently running.""" + return not self.done() + + def _blocking_poll(self, timeout=None): + """Poll and wait for the Future to be resolved. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + """ + if self._result_set: + return + + retry_on = tenacity.retry_if_result( + functools.partial(operator.is_not, True)) + # Use exponential backoff with jitter. + wait_on = ( + tenacity.wait_exponential(multiplier=1, max=10) + + tenacity.wait_random(0, 1)) + + if timeout is None: + retry = tenacity.retry(retry=retry_on, wait=wait_on) + else: + retry = tenacity.retry( + retry=retry_on, + wait=wait_on, + stop=tenacity.stop_after_delay(timeout)) + + try: + retry(self.done)() + except tenacity.RetryError as exc: + six.raise_from( + concurrent.futures.TimeoutError( + 'Operation did not complete within the designated ' + 'timeout.'), + exc) + + def result(self, timeout=None): + """Get the result of the operation, blocking if necessary. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + google.protobuf.Message: The Operation's result. + + Raises: + google.gax.GaxError: If the operation errors or if the timeout is + reached before the operation completes. + """ + self._blocking_poll(timeout=timeout) + + if self._exception is not None: + # pylint: disable=raising-bad-type + # Pylint doesn't recognize that this is valid in this case. + raise self._exception + + return self._result + + def exception(self, timeout=None): + """Get the exception from the operation, blocking if necessary. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + Optional[google.gax.GaxError]: The operation's error. + """ + self._blocking_poll() + return self._exception + + def add_done_callback(self, fn): + """Add a callback to be executed when the operation is complete. + + If the operation is not already complete, this will start a helper + thread to poll for the status of the operation in the background. + + Args: + fn (Callable[Future]): The callback to execute when the operation + is complete. + """ + if self._result_set: + _helpers.safe_invoke_callback(fn, self) + return + + self._done_callbacks.append(fn) + + if self._polling_thread is None: + # The polling thread will exit on its own as soon as the operation + # is done. + self._polling_thread = _helpers.start_daemon_thread( + target=self._blocking_poll) + + def _invoke_callbacks(self, *args, **kwargs): + """Invoke all done callbacks.""" + for callback in self._done_callbacks: + _helpers.safe_invoke_callback(callback, *args, **kwargs) + + def set_result(self, result): + """Set the Future's result.""" + self._result = result + self._result_set = True + self._invoke_callbacks(self) + + def set_exception(self, exception): + """Set the Future's exception.""" + self._exception = exception + self._result_set = True + self._invoke_callbacks(self) diff --git a/core/tests/unit/future/test_operation.py b/core/tests/unit/future/test_operation.py index 0e29aa687ee6..2d281694001a 100644 --- a/core/tests/unit/future/test_operation.py +++ b/core/tests/unit/future/test_operation.py @@ -61,7 +61,7 @@ def make_operation_future(client_operations_responses=None): def test_constructor(): - future, refresh, cancel = make_operation_future() + future, refresh, _ = make_operation_future() assert future.operation == refresh.responses[0] assert future.operation.done is False diff --git a/core/tests/unit/future/test_base.py b/core/tests/unit/future/test_polling.py similarity index 97% rename from core/tests/unit/future/test_base.py rename to core/tests/unit/future/test_polling.py index 69a0348e68d9..c8fde1c20385 100644 --- a/core/tests/unit/future/test_base.py +++ b/core/tests/unit/future/test_polling.py @@ -19,10 +19,10 @@ import mock import pytest -from google.cloud.future import base +from google.cloud.future import polling -class PollingFutureImpl(base.PollingFuture): +class PollingFutureImpl(polling.PollingFuture): def done(self): return False