Skip to content

Commit

Permalink
Merge pull request #2336 from dhermes/fix-0.13.0-gax
Browse files Browse the repository at this point in the history
Fixing broken exception handling after GAX 0.13.0 upgrade.
  • Loading branch information
dhermes authored Sep 19, 2016
2 parents 5f1cda9 + b185ee8 commit 6fb2d73
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 232 deletions.
19 changes: 0 additions & 19 deletions google/cloud/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@
except ImportError:
app_identity = None
try:
from google.gax.grpc import exc_to_code as beta_exc_to_code
import grpc
from grpc._channel import _Rendezvous
except ImportError: # pragma: NO COVER
beta_exc_to_code = None
grpc = None
_Rendezvous = Exception
import six
from six.moves import http_client
from six.moves import configparser
Expand Down Expand Up @@ -685,21 +681,6 @@ def make_insecure_stub(stub_class, host, port=None):
return stub_class(channel)


def exc_to_code(exc):
"""Retrieves the status code from a gRPC exception.
:type exc: :class:`Exception`
:param exc: An exception from gRPC beta or stable.
:rtype: :class:`grpc.StatusCode`
:returns: The status code attached to the exception.
"""
if isinstance(exc, _Rendezvous):
return exc.code()
else:
return beta_exc_to_code(exc)


try:
from pytz import UTC # pylint: disable=unused-import,wrong-import-order
except ImportError:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class InvalidChunk(RuntimeError):
class PartialRowsData(object):
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.
:type response_iterator: :class:`grpc._channel._Rendezvous`
:type response_iterator: :class:`~google.cloud.exceptions.GrpcRendezvous`
:param response_iterator: A streaming iterator returned from a
``ReadRows`` request.
"""
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def sample_row_keys(self):
samples would require space roughly equal to the difference in their
``offset_bytes`` fields.
:rtype: :class:`grpc._channel._Rendezvous`
:rtype: :class:`~google.cloud.exceptions.GrpcRendezvous`
:returns: A cancel-able iterator. Can be consumed by calling ``next()``
or by casting to a :class:`list` and can be cancelled by
calling ``cancel()``.
Expand Down
5 changes: 2 additions & 3 deletions google/cloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_HOST
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import GrpcRendezvous
from google.cloud.exceptions import make_exception
from google.cloud.datastore._generated import datastore_pb2 as _datastore_pb2
# pylint: disable=ungrouped-imports
try:
from grpc import StatusCode
from grpc._channel import _Rendezvous
from google.cloud.datastore._generated import datastore_grpc_pb2
except ImportError: # pragma: NO COVER
_HAVE_GRPC = False
datastore_grpc_pb2 = None
StatusCode = None
_Rendezvous = Exception
else:
_HAVE_GRPC = True
# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -313,7 +312,7 @@ def commit(self, project, request_pb):
request_pb.project_id = project
try:
return self._stub.Commit(request_pb)
except _Rendezvous as exc:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.ABORTED:
raise Conflict(exc.details())
raise
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@

_HTTP_CODE_TO_EXCEPTION = {} # populated at end of module

try:
from grpc._channel import _Rendezvous
except ImportError: # pragma: NO COVER
_Rendezvous = None


# pylint: disable=invalid-name
GrpcRendezvous = _Rendezvous
"""Exception class raised by gRPC stable."""
# pylint: enable=invalid-name


class GoogleCloudError(Exception):
"""Base error class for Google Cloud errors (abstract).
Expand Down
39 changes: 19 additions & 20 deletions google/cloud/logging/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.logging.type.log_severity_pb2 import LogSeverity
from google.logging.v2.logging_config_pb2 import LogSink
from google.logging.v2.logging_metrics_pb2 import LogMetric
Expand All @@ -29,8 +28,8 @@
# pylint: disable=ungrouped-imports
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud._helpers import exc_to_code
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import GrpcRendezvous
from google.cloud.exceptions import NotFound
# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -123,8 +122,8 @@ def logger_delete(self, project, logger_name):
path = 'projects/%s/logs/%s' % (project, logger_name)
try:
self._gax_api.delete_log(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down Expand Up @@ -195,8 +194,8 @@ def sink_create(self, project, sink_name, filter_, destination):
destination=destination)
try:
self._gax_api.create_sink(parent, sink_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
path = 'projects/%s/sinks/%s' % (project, sink_name)
raise Conflict(path)
raise
Expand All @@ -218,8 +217,8 @@ def sink_get(self, project, sink_name):
path = 'projects/%s/sinks/%s' % (project, sink_name)
try:
sink_pb = self._gax_api.get_sink(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_sink_pb_to_mapping(sink_pb)
Expand Down Expand Up @@ -250,8 +249,8 @@ def sink_update(self, project, sink_name, filter_, destination):
sink_pb = LogSink(name=path, filter=filter_, destination=destination)
try:
self._gax_api.update_sink(path, sink_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_sink_pb_to_mapping(sink_pb)
Expand All @@ -269,8 +268,8 @@ def sink_delete(self, project, sink_name):
path = 'projects/%s/sinks/%s' % (project, sink_name)
try:
self._gax_api.delete_sink(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down Expand Up @@ -340,8 +339,8 @@ def metric_create(self, project, metric_name, filter_, description):
description=description)
try:
self._gax_api.create_log_metric(parent, metric_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
path = 'projects/%s/metrics/%s' % (project, metric_name)
raise Conflict(path)
raise
Expand All @@ -363,8 +362,8 @@ def metric_get(self, project, metric_name):
path = 'projects/%s/metrics/%s' % (project, metric_name)
try:
metric_pb = self._gax_api.get_log_metric(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_metric_pb_to_mapping(metric_pb)
Expand Down Expand Up @@ -395,8 +394,8 @@ def metric_update(self, project, metric_name, filter_, description):
description=description)
try:
self._gax_api.update_log_metric(path, metric_pb, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise
return _log_metric_pb_to_mapping(metric_pb)
Expand All @@ -414,8 +413,8 @@ def metric_delete(self, project, metric_name):
path = 'projects/%s/metrics/%s' % (project, metric_name)
try:
self._gax_api.delete_log_metric(path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(path)
raise

Expand Down
57 changes: 28 additions & 29 deletions google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.implementations import insecure_channel
from grpc import insecure_channel
from grpc import StatusCode

# pylint: disable=ungrouped-imports
from google.cloud._helpers import _to_bytes
from google.cloud._helpers import exc_to_code
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud.exceptions import Conflict
from google.cloud.exceptions import GrpcRendezvous
from google.cloud.exceptions import NotFound
# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -93,8 +92,8 @@ def topic_create(self, topic_path):
"""
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return {'name': topic_pb.name}
Expand All @@ -116,8 +115,8 @@ def topic_get(self, topic_path):
"""
try:
topic_pb = self._gax_api.get_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return {'name': topic_pb.name}
Expand All @@ -134,8 +133,8 @@ def topic_delete(self, topic_path):
"""
try:
self._gax_api.delete_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise

Expand Down Expand Up @@ -163,8 +162,8 @@ def topic_publish(self, topic_path, messages):
try:
result = self._gax_api.publish(topic_path, message_pbs,
options=options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return result.message_ids
Expand Down Expand Up @@ -201,8 +200,8 @@ def topic_list_subscriptions(self, topic_path, page_size=0,
try:
page_iter = self._gax_api.list_topic_subscriptions(
topic_path, page_size=page_size, options=options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
subs = page_iter.next()
Expand Down Expand Up @@ -294,8 +293,8 @@ def subscription_create(self, subscription_path, topic_path,
try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path, push_config, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return _subscription_pb_to_mapping(sub_pb)
Expand All @@ -316,8 +315,8 @@ def subscription_get(self, subscription_path):
"""
try:
sub_pb = self._gax_api.get_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return _subscription_pb_to_mapping(sub_pb)
Expand All @@ -335,8 +334,8 @@ def subscription_delete(self, subscription_path):
"""
try:
self._gax_api.delete_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand All @@ -360,8 +359,8 @@ def subscription_modify_push_config(self, subscription_path,
push_config = PushConfig(push_endpoint=push_endpoint)
try:
self._gax_api.modify_push_config(subscription_path, push_config)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand Down Expand Up @@ -392,8 +391,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,
try:
response_pb = self._gax_api.pull(
subscription_path, max_messages, return_immediately)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return [_received_message_pb_to_mapping(rmpb)
Expand All @@ -415,8 +414,8 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
"""
try:
self._gax_api.acknowledge(subscription_path, ack_ids)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand All @@ -442,8 +441,8 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
try:
self._gax_api.modify_ack_deadline(
subscription_path, ack_ids, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
except GrpcRendezvous as exc:
if exc.code() == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

Expand Down Expand Up @@ -520,7 +519,7 @@ def make_gax_publisher_api(connection):
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
channel = insecure_channel(connection.host)
return PublisherApi(channel=channel)


Expand All @@ -540,5 +539,5 @@ def make_gax_subscriber_api(connection):
"""
channel = None
if connection.in_emulator:
channel = insecure_channel(connection.host, None)
channel = insecure_channel(connection.host)
return SubscriberApi(channel=channel)
5 changes: 3 additions & 2 deletions system_tests/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ def _retry_on_unavailable(exc):


def setUpModule():
from grpc._channel import _Rendezvous
from google.cloud.exceptions import GrpcRendezvous

Config.CLIENT = Client(admin=True)
Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, LOCATION_ID)
retry = RetryErrors(_Rendezvous, error_predicate=_retry_on_unavailable)
retry = RetryErrors(GrpcRendezvous, error_predicate=_retry_on_unavailable)
instances, failed_locations = retry(Config.CLIENT.list_instances)()

if len(failed_locations) != 0:
Expand Down
Loading

0 comments on commit 6fb2d73

Please sign in to comment.