Skip to content
This repository was archived by the owner on Mar 20, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 0 additions & 215 deletions google/gax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,9 @@

import collections
import logging
import multiprocessing as mp

import dill
from grpc import RpcError, StatusCode
import pkg_resources

from google.gax.errors import GaxError
from google.gax.retry import retryable
from google.rpc import code_pb2

# pylint: disable=no-member
__version__ = pkg_resources.get_distribution('google-gax').version
# pylint: enable=no-member
Expand Down Expand Up @@ -502,211 +495,3 @@ def __next__(self):
if self._index >= len(self._current):
self._current = None
return resource


def _from_any(pb_type, any_pb):
"""Converts an Any protobuf to the specified message type

Args:
pb_type (type): the type of the message that any_pb stores an instance
of.
any_pb (google.protobuf.any_pb2.Any): the object to be converted.

Returns:
pb_type: An instance of the pb_type message.

Raises:
TypeError: if the message could not be converted.
"""
msg = pb_type()
# Check exceptional case: raise if can't Unpack
if not any_pb.Unpack(msg):
raise TypeError(
'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))

# Return expected message
return msg


def _try_callback(target, clbk):
try:
clbk(target)
except Exception as ex: # pylint: disable=broad-except
_LOG.exception(ex)


class _DeadlineExceededError(RpcError, GaxError):

def __init__(self):
super(_DeadlineExceededError, self).__init__('Deadline Exceeded')

def code(self): # pylint: disable=no-self-use
"""Always returns StatusCode.DEADLINE_EXCEEDED"""
return StatusCode.DEADLINE_EXCEEDED


class _OperationFuture(object):
"""A Future which polls a service for completion via OperationsClient."""

def __init__(self, operation, client, result_type, metadata_type,
call_options=None):
"""
Args:
operation (google.longrunning.Operation): the initial long-running
operation object.
client
(google.gapic.longrunning.operations_client.OperationsClient):
a client for the long-running operation service.
result_type (type): the class type of the result.
metadata_type (Optional[type]): the class type of the metadata.
call_options (Optional[google.gax.CallOptions]): the call options
that are used when reloading the operation.
"""
self._operation = operation
self._client = client
self._result_type = result_type
self._metadata_type = metadata_type
self._call_options = call_options
self._queue = mp.Queue()
self._process = None

def cancel(self):
"""If last Operation's value of `done` is true, returns false;
otherwise, issues OperationsClient.cancel_operation and returns true.
"""
if self.done():
return False

self._client.cancel_operation(self._operation.name)
return True

def result(self, timeout=None):
"""Enters polling loop on OperationsClient.get_operation, and once
Operation.done is true, then returns Operation.response if successful or
throws GaxError if not successful.

This method will wait up to timeout seconds. If the call hasn't
completed in timeout seconds, then a RetryError will be raised. timeout
can be an int or float. If timeout is not specified or None, there is no
limit to the wait time.
"""
# Check exceptional case: raise if no response
if not self._poll(timeout).HasField('response'):
raise GaxError(self._operation.error.message)

# Return expected result
return _from_any(self._result_type, self._operation.response)

def exception(self, timeout=None):
"""Similar to result(), except returns the exception if any."""
# Check exceptional case: return none if no error
if not self._poll(timeout).HasField('error'):
return None

# Return expected error
return self._operation.error

def cancelled(self):
"""Return True if the call was successfully cancelled."""
self._get_operation()
return (self._operation.HasField('error') and
self._operation.error.code == code_pb2.CANCELLED)

def done(self):
"""Issues OperationsClient.get_operation and returns value of
Operation.done.
"""
return self._get_operation().done

def add_done_callback(self, fn): # pylint: disable=invalid-name
"""Enters a polling loop on OperationsClient.get_operation, and once the
operation is done or cancelled, calls the function with this
_OperationFuture. Added callables are called in the order that they were
added.
"""
if self._operation.done:
_try_callback(self, fn)
else:
self._queue.put(dill.dumps(fn))
if self._process is None:
self._process = mp.Process(target=self._execute_tasks)
self._process.start()

def operation_name(self):
"""Returns the value of Operation.name."""
return self._operation.name

def metadata(self):
"""Returns the value of Operation.metadata from the last call to
OperationsClient.get_operation (or if only the initial API call has been
made, the metadata from that first call).
"""
# Check exceptional case: return none if no metadata
if not self._operation.HasField('metadata'):
return None

# Return expected metadata
return _from_any(self._metadata_type, self._operation.metadata)

def last_operation_data(self):
"""Returns the data from the last call to OperationsClient.get_operation
(or if only the initial API call has been made, the data from that first
call).
"""
return self._operation

def _get_operation(self):
if not self._operation.done:
self._operation = self._client.get_operation(
self._operation.name, self._call_options)

return self._operation

def _poll(self, timeout=None):
def _done_check(_):
# Check exceptional case: raise if in progress
if not self.done():
raise _DeadlineExceededError()

# Return expected operation
return self._operation

# If a timeout is set, then convert it to milliseconds.
#
# Also, we need to send 0 instead of None for the rpc arguments,
# because an internal method (`_has_timeout_settings`) will
# erroneously return False otherwise.
rpc_arg = None
if timeout is not None:
timeout *= 1000
rpc_arg = 0

# Set the backoff settings. We have specific backoff settings
# for "are we there yet" calls that are distinct from those configured
# in the config.json files.
backoff_settings = BackoffSettings(
initial_retry_delay_millis=1000,
retry_delay_multiplier=2,
max_retry_delay_millis=30000,
initial_rpc_timeout_millis=rpc_arg,
rpc_timeout_multiplier=rpc_arg,
max_rpc_timeout_millis=rpc_arg,
total_timeout_millis=timeout,
)

# Set the retry to retry if `_done_check` raises the
# _DeadlineExceededError, according to the given backoff settings.
retry_options = RetryOptions(
[StatusCode.DEADLINE_EXCEEDED], backoff_settings)
retryable_done_check = retryable(_done_check, retry_options)

# Start polling, and return the final result from `_done_check`.
return retryable_done_check()

def _execute_tasks(self):
self._poll()

while not self._queue.empty():
task = dill.loads(self._queue.get())
_try_callback(self, task)
36 changes: 36 additions & 0 deletions google/gax/future/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Futures for dealing with asynchronous operations."""

from google.gax.future.base import Future

__all__ = [
'Future',
]
130 changes: 130 additions & 0 deletions google/gax/future/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Private helpers for futures."""

import logging
import threading

from google import gax
from google.gax import retry


_LOGGER = logging.getLogger(__name__)


def from_any(pb_type, any_pb):
"""Converts an Any protobuf to the specified message type

Args:
pb_type (type): the type of the message that any_pb stores an instance
of.
any_pb (google.protobuf.any_pb2.Any): the object to be converted.

Returns:
pb_type: An instance of the pb_type message.

Raises:
TypeError: if the message could not be converted.
"""
msg = pb_type()
if not any_pb.Unpack(msg):
raise TypeError(
'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))

return msg


def start_daemon_thread(*args, **kwargs):
"""Starts a thread and marks it as a daemon thread."""
thread = threading.Thread(*args, **kwargs)
thread.daemon = True
thread.start()
return thread


def safe_invoke_callback(callback, *args, **kwargs):
"""Invoke a callback, swallowing and logging any exceptions."""
# pylint: disable=bare-except
# We intentionally want to swallow all exceptions.
try:
callback(*args, **kwargs)
except:
_LOGGER.exception('Error while executing Future callback.')


def blocking_poll(poll_once_func, retry_codes, timeout=None):
"""A pattern for repeatedly polling a function.

This pattern uses gax's retry and backoff functionality to continuously
poll a function. The function can raises
:class:`google.gax.errors.TimeoutError` to indicate that it should be
polled again. This pattern will continue to call the function until the
timeout expires or the function returns a value.

Args:
poll_once_func (Callable): The function to invoke.
retry_codes (Sequence[str]): a list of Google API error codes that
signal a retry should happen.
timeout (int): The maximum number of seconds to poll.

Returns:
Any: The final result of invoking the function.
"""
# If a timeout is set, then convert it to milliseconds.
#
# Also, we need to send 0 instead of None for the rpc arguments,
# because an internal method (`_has_timeout_settings`) will
# erroneously return False otherwise.
rpc_timeout = None
if timeout is not None:
timeout *= 1000
rpc_timeout = 0

# Set the backoff settings. We have specific backoff settings
# for "are we there yet" calls that are distinct from those configured
# in the config.json files.
backoff_settings = gax.BackoffSettings(
initial_retry_delay_millis=1000,
retry_delay_multiplier=2,
max_retry_delay_millis=30000,
initial_rpc_timeout_millis=rpc_timeout,
rpc_timeout_multiplier=rpc_timeout,
max_rpc_timeout_millis=rpc_timeout,
total_timeout_millis=timeout,
)

# Set the retry to retry if poll_once_func raises the
# a deadline exceeded error, according to the given backoff settings.
retry_options = gax.RetryOptions(retry_codes, backoff_settings)
retryable_poll = retry.retryable(poll_once_func, retry_options)

# Start polling, and return the final result from the poll_once_func.
return retryable_poll()
Loading