Skip to content
1 change: 0 additions & 1 deletion eng/tox/allowed_pylint_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
"azure-eventgrid",
"azure-graphrbac",
"azure-loganalytics",
"azure-servicebus",
"azure-servicefabric",
"azure-template",
"azure-keyvault",
Expand Down
1 change: 1 addition & 0 deletions eng/tox/mypy_hard_failure_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
MYPY_HARD_FAILURE_OPTED = [
"azure-core",
"azure-eventhub",
"azure-servicebus",
"azure-ai-textanalytics",
"azure-ai-formrecognizer"
]
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-servicebus/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
81 changes: 42 additions & 39 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
import time
from datetime import timedelta
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable, Type

try:
from urllib import quote_plus # type: ignore
Expand Down Expand Up @@ -90,6 +90,31 @@ def _generate_sas_token(uri, policy, key, expiry=None):
return _AccessToken(token=token, expires_on=abs_expiry)


def _convert_connection_string_to_kwargs(conn_str, shared_key_credential_type, **kwargs):
# type: (str, Type, Any) -> Dict[str, Any]
host, policy, key, entity_in_conn_str = _parse_conn_str(conn_str)
queue_name = kwargs.get("queue_name")
topic_name = kwargs.get("topic_name")
if not (queue_name or topic_name or entity_in_conn_str):
raise ValueError("Entity name is missing. Please specify `queue_name` or `topic_name`"
" or use a connection string including the entity information.")

if queue_name and topic_name:
raise ValueError("`queue_name` and `topic_name` can not be specified simultaneously.")

entity_in_kwargs = queue_name or topic_name
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
raise ServiceBusAuthorizationError(
"Entity names do not match, the entity name in connection string is {};"
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
)

kwargs["fully_qualified_namespace"] = host

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last dumb nitpick: why not used a named tuple for this, if we're returning a structured and constrained set of values? (I've always had a dislike for arbitrary dicts because you get so little guidance on "what to expect/what's correct")

kwargs["entity_name"] = entity_in_conn_str or entity_in_kwargs
kwargs["credential"] = shared_key_credential_type(policy, key)
return kwargs


class ServiceBusSharedKeyCredential(object):
"""The shared access key credential used for authentication.

Expand All @@ -110,14 +135,15 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
return _generate_sas_token(scopes[0], self.policy, self.key)


class BaseHandler(object): # pylint:disable=too-many-instance-attributes
class BaseHandler: # pylint:disable=too-many-instance-attributes
def __init__(
self,
fully_qualified_namespace,
entity_name,
credential,
**kwargs
):
# type: (str, str, TokenCredential, Any) -> None
self.fully_qualified_namespace = fully_qualified_namespace
self._entity_name = entity_name

Expand All @@ -128,7 +154,7 @@ def __init__(
self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
self._config = Configuration(**kwargs)
self._running = False
self._handler = None
self._handler = None # type: uamqp.AMQPClient
self._auth_uri = None
self._properties = create_properties()

Expand All @@ -140,6 +166,7 @@ def __exit__(self, *args):
self.close()

def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self)
if error_need_close_handler:
self._close_handler()
Expand All @@ -148,38 +175,14 @@ def _handle_exception(self, exception):

return error

@staticmethod
def _from_connection_string(conn_str, **kwargs):
# type: (str, Any) -> Dict[str, Any]
host, policy, key, entity_in_conn_str = _parse_conn_str(conn_str)
queue_name = kwargs.get("queue_name")
topic_name = kwargs.get("topic_name")
if not (queue_name or topic_name or entity_in_conn_str):
raise ValueError("Entity name is missing. Please specify `queue_name` or `topic_name`"
" or use a connection string including the entity information.")

if queue_name and topic_name:
raise ValueError("`queue_name` and `topic_name` can not be specified simultaneously.")

entity_in_kwargs = queue_name or topic_name
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
raise ServiceBusAuthorizationError(
"Entity names do not match, the entity name in connection string is {};"
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
)

kwargs["fully_qualified_namespace"] = host
kwargs["entity_name"] = entity_in_conn_str or entity_in_kwargs
kwargs["credential"] = ServiceBusSharedKeyCredential(policy, key)
return kwargs

def _backoff(
self,
retried_times,
last_exception,
timeout=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
Expand All @@ -200,40 +203,39 @@ def _backoff(
raise last_exception

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
require_last_exception = kwargs.pop("require_last_exception", False)
require_timeout = kwargs.pop("require_timeout", False)
retried_times = 0
last_exception = None
max_retries = self._config.retry_total

while retried_times <= max_retries:
try:
if require_last_exception:
kwargs["last_exception"] = last_exception
if require_timeout:
kwargs["timeout"] = timeout
return operation(**kwargs)
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = self._handle_exception(exception)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
if retried_times > max_retries:
break
_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
self._container_id,
last_exception,
)
raise last_exception
self._backoff(
retried_times=retried_times,
last_exception=last_exception,
timeout=timeout
)

_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
self._container_id,
last_exception,
)
raise last_exception

def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs):
# type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message
self._open()
application_properties = {}
# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
Expand Down Expand Up @@ -265,6 +267,7 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a
raise ServiceBusError("Management request failed: {}".format(exp), exp)

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import uuid
import requests
try:
from urlparse import urlparse
from urllib import unquote_plus
from urlparse import urlparse # type: ignore
from urllib import unquote_plus # type: ignore
except ImportError:
from urllib.parse import urlparse
from urllib.parse import unquote_plus
Expand Down
Loading