diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 36319b7f146c..67ee777c5f10 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -158,9 +158,9 @@ with ServiceBusClient.from_connection_string(connstr) as client: ### Receive messages from a queue -To receive from a queue, you can either perform an ad-hoc receive via "receiver.receive_messages()" or receive persistently through the receiver itself. +To receive from a queue, you can either perform an ad-hoc receive via `receiver.receive_messages()` or receive persistently through the receiver itself. -#### Receive messages from a queue through iterating over ServiceBusReceiver +#### [Receive messages from a queue through iterating over ServiceBusReceiver][streaming_receive_reference] ```Python from azure.servicebus import ServiceBusClient @@ -173,7 +173,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt. # Default is None; to receive forever. with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver: - for msg in receiver: # ServiceBusReceiver instance is a generator + for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter(). print(str(msg)) # If it is desired to halt receiving early, one can break out of the loop here safely. ``` @@ -183,7 +183,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: > See [AutoLockRenewer](#autolockrenew) for a helper to perform this in the background automatically. > Lock duration is set in Azure on the queue or topic itself. -#### [Receive messages from a queue through `ServiceBusReceiver.receive_messages()`][receive_reference] +#### [Receive messages from a queue through ServiceBusReceiver.receive_messages()][receive_reference] > **NOTE:** `ServiceBusReceiver.receive_messages()` receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list. @@ -446,14 +446,14 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio [service_bus_overview]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview [queue_status_codes]: https://docs.microsoft.com/rest/api/servicebus/create-queue#response-codes [service_bus_docs]: https://docs.microsoft.com/azure/service-bus/ -[service_bus_mgmt_docs]: https://docs.microsoft.com/en-us/python/api/overview/azure/servicebus/management?view=azure-python -[queue_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#queues -[topic_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#topics -[subscription_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions -[azure_namespace_creation]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal +[service_bus_mgmt_docs]: https://docs.microsoft.com/python/api/overview/azure/servicebus/management?view=azure-python +[queue_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#queues +[topic_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#topics +[subscription_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions +[azure_namespace_creation]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal [servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus -[get_servicebus_conn_str]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string -[servicebus_aad_authentication]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-authentication-and-authorization +[get_servicebus_conn_str]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string +[servicebus_aad_authentication]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-authentication-and-authorization [token_credential_interface]: ../../core/azure-core/azure/core/credentials.py [pypi_azure_identity]: https://pypi.org/project/azure-identity/ [message_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.Message @@ -462,6 +462,7 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio [client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient [send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages [receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages +[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter [session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages [session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.Message.session_id [complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete#azure.servicebus.ReceivedMessage.complete diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index cb9a939f6c01..f1744342bd68 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -53,6 +53,17 @@ class AutoLockRenew(object): """ def __init__(self, executor=None, max_workers=None): + # type: (ThreadPoolExecutor, int) -> None + """Auto renew locks for messages and sessions using a background thread pool. + + :param executor: A user-specified thread pool. This cannot be combined with + setting `max_workers`. + :type executor: ~concurrent.futures.ThreadPoolExecutor + :param max_workers: Specify the maximum workers in the thread pool. If not + specified the number used will be derived from the core count of the environment. + This cannot be combined with `executor`. + :type max_workers: int + """ self._executor = executor or ThreadPoolExecutor(max_workers=max_workers) self._shutdown = threading.Event() self._sleep_time = 1 @@ -109,16 +120,18 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= on_lock_renew_failure(renewable, error) def register(self, renewable, timeout=300, on_lock_renew_failure=None): + # type: (Union[ReceivedMessage, ServiceBusSession], float, Optional[LockRenewFailureCallback]) -> None """Register a renewable entity for automatic lock renewal. :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.ReceivedMessage or - ~azure.servicebus.ServiceBusSession - :param float timeout: A time in seconds that the lock should be maintained for. - Default value is 300 (5 minutes). - :param Optional[LockRenewFailureCallback] on_lock_renew_failure: - A callback may be specified to be called when the lock is lost on the renewable that is being registered. - Default value is None (no callback). + :type renewable: Union[~azure.servicebus.ReceivedMessage, ~azure.servicebus.ServiceBusSession] + :param timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). + :type timeout: float + :param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable + that is being registered. Default value is None (no callback). + :type on_lock_renew_failure: Optional[LockRenewFailureCallback] + + :rtype: None """ if self._shutdown.is_set(): raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" @@ -131,6 +144,8 @@ def close(self, wait=True): :param wait: Whether to block until thread pool has shutdown. Default is `True`. :type wait: bool + + :rtype: None """ self._shutdown.set() self._executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 8d05221c8322..335c5a3be1a3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -9,7 +9,7 @@ import uuid import functools import logging -from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable +from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any import uamqp.message @@ -52,7 +52,8 @@ MessageLockExpired, SessionLockExpired, MessageSettleFailed, - MessageContentTooLarge) + MessageContentTooLarge, + ServiceBusError) from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed if TYPE_CHECKING: from .._servicebus_receiver import ServiceBusReceiver @@ -65,7 +66,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta """A Service Bus Message. :param body: The data to send in a single message. - :type body: str or bytes + :type body: Union[str, bytes] :keyword dict properties: The user defined properties on the message. :keyword str session_id: The session identifier of the message for a sessionful entity. @@ -95,6 +96,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta """ def __init__(self, body, **kwargs): + # type: (Union[str, bytes], Any) -> None # Although we might normally thread through **kwargs this causes # problems as MessageProperties won't absorb spurious args. self._encoding = kwargs.pop("encoding", 'UTF-8') @@ -491,7 +493,6 @@ class BatchMessage(object): :vartype message: ~uamqp.BatchMessage :param int max_size_in_bytes: The maximum size of bytes data that a BatchMessage object can hold. - """ def __init__(self, max_size_in_bytes=None): # type: (Optional[int]) -> None @@ -570,11 +571,11 @@ class PeekMessage(Message): This message is still on the queue, and unlocked. A peeked message cannot be completed, abandoned, dead-lettered or deferred. It has no lock token or expiry. - """ def __init__(self, message): - super(PeekMessage, self).__init__(None, message=message) + # type: (uamqp.message.Message) -> None + super(PeekMessage, self).__init__(None, message=message) # type: ignore def _to_outgoing_message(self): # type: () -> Message @@ -741,12 +742,17 @@ class ReceivedMessageBase(PeekMessage): """ def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): + # type: (uamqp.message.Message, ReceiveSettleMode, Any) -> None super(ReceivedMessageBase, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) - self.auto_renew_error = None - self._receiver = None # type: ignore + self.auto_renew_error = None # type: Optional[Exception] + try: + self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver] + except KeyError: + raise TypeError("ReceivedMessage requires a receiver to be initialized. This class should never be" + \ + "initialized by a user; the Message class should be utilized instead.") self._expiry = None def _check_live(self, action): @@ -769,6 +775,7 @@ def _check_live(self, action): def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None): # type: (str, Optional[str], Optional[str]) -> Callable # pylint: disable=protected-access + if settle_operation == MESSAGE_COMPLETE: return functools.partial( self._receiver._settle_message, @@ -822,13 +829,14 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d @property def _lock_expired(self): # type: () -> bool + # pylint: disable=protected-access """ Whether the lock on the message has expired. :rtype: bool """ try: - if self._receiver.session: # pylint: disable=protected-access + if self._receiver.session: # type: ignore raise TypeError("Session messages do not expire. Please use the Session expiry instead.") except AttributeError: # Is not a session receiver pass @@ -859,6 +867,7 @@ def lock_token(self): @property def locked_until_utc(self): # type: () -> Optional[datetime.datetime] + # pylint: disable=protected-access """ The UTC datetime until which the message will be locked in the queue/subscription. When the lock expires, delivery count of hte message is incremented and the message @@ -867,7 +876,7 @@ def locked_until_utc(self): :rtype: datetime.datetime """ try: - if self._settled or self._receiver.session: # pylint: disable=protected-access + if self._settled or self._receiver.session: # type: ignore return None except AttributeError: # not settled, and isn't session receiver. pass @@ -1021,6 +1030,7 @@ def defer(self): def renew_lock(self): # type: () -> None + # pylint: disable=protected-access,no-member """Renew the message lock. This will maintain the lock on the message to ensure it is not returned to the queue @@ -1041,7 +1051,7 @@ def renew_lock(self): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. """ try: - if self._receiver.session: + if self._receiver.session: # type: ignore raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.") except AttributeError: pass @@ -1050,5 +1060,5 @@ def renew_lock(self): if not token: raise ValueError("Unable to renew lock - no lock token found.") - expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access,no-member + expiry = self._receiver._renew_locks(token) # type: ignore self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py index 48ebdfe156ec..c2cfa70a5e5a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py @@ -62,6 +62,7 @@ def deferred_message_op( status_code, message, description, + receiver, mode=ReceiveSettleMode.PeekLock, message_type=ReceivedMessage ): @@ -69,7 +70,7 @@ def deferred_message_op( parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) - parsed.append(message_type(wrapped, mode, is_deferred_message=True)) + parsed.append(message_type(wrapped, mode, is_deferred_message=True, receiver=receiver)) return parsed if status_code in [202, 204]: return [] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py index edd3404800cd..2a767e4795a6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py @@ -51,8 +51,7 @@ def _populate_attributes(self, **kwargs): self._max_wait_time = kwargs.get("max_wait_time", None) def _build_message(self, received, message_type=ReceivedMessage): - message = message_type(message=received, mode=self._mode) - message._receiver = self # pylint: disable=protected-access + message = message_type(message=received, mode=self._mode, receiver=self) self._last_received_sequenced_number = message.sequence_number return message diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index c0ad711e2590..28f253bdfdf2 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -430,7 +430,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** ) def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): - # type: (str, str, str, Any) -> ServiceBusReceiver + # type: (str, str, str, Any) -> ServiceBusSessionReceiver """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 7a322e6b76a0..130ff7a77f7e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -277,7 +277,7 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): ) def _renew_locks(self, *lock_tokens): - # type: (*str) -> Any + # type: (str) -> Any message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)} return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RENEWLOCK_OPERATION, @@ -286,15 +286,25 @@ def _renew_locks(self, *lock_tokens): ) def get_streaming_message_iter(self, max_wait_time=None): + # type: (float) -> Iterator[ReceivedMessage] """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until such a timeout occurs. - :param float max_wait_time: Maximum time to wait in seconds for the next message to arrive. + :param max_wait_time: Maximum time to wait in seconds for the next message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive for the timeout period, the iterator will stop. + :type max_wait_time: float + :rtype: Iterator[ReceivedMessage] - :rtype Iterator[ReceivedMessage] + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START receive_forever] + :end-before: [END receive_forever] + :language: python + :dedent: 4 + :caption: Receive indefinitely from an iterator in streaming fashion. """ return self._iter_contextual_wrapper(max_wait_time) @@ -308,6 +318,7 @@ def from_connection_string( """Create a ServiceBusReceiver from a connection string. :param conn_str: The connection string of a Service Bus. + :type conn_str: str :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription the client connects to. @@ -384,7 +395,8 @@ def receive_messages(self, max_batch_size=None, max_wait_time=None): If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, an no messages arrive within the timeout period, an empty list will be returned. - :rtype: list[~azure.servicebus.ReceivedMessage] + + :rtype: List[~azure.servicebus.ReceivedMessage] .. admonition:: Example: @@ -411,9 +423,9 @@ def receive_deferred_messages(self, sequence_numbers): When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition. - :param list[int] sequence_numbers: A list of the sequence numbers of messages that have been + :param List[int] sequence_numbers: A list of the sequence numbers of messages that have been deferred. - :rtype: list[~azure.servicebus.ReceivedMessage] + :rtype: List[~azure.servicebus.ReceivedMessage] .. admonition:: Example: @@ -440,14 +452,12 @@ def receive_deferred_messages(self, sequence_numbers): self._populate_message_properties(message) - handler = functools.partial(mgmt_handlers.deferred_message_op, mode=self._mode) + handler = functools.partial(mgmt_handlers.deferred_message_op, mode=self._mode, receiver=self) messages = self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, handler ) - for m in messages: - m._receiver = self # pylint: disable=protected-access return messages def peek_messages(self, message_count=1, sequence_number=None): @@ -460,7 +470,8 @@ def peek_messages(self, message_count=1, sequence_number=None): :param int message_count: The maximum number of messages to try and peek. The default value is 1. :param int sequence_number: A message sequence number from which to start browsing messages. - :rtype: list[~azure.servicebus.PeekMessage] + + :rtype: List[~azure.servicebus.PeekMessage] .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index d7b199502c69..a5a5131cac22 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -198,10 +198,10 @@ def schedule_messages(self, messages, schedule_time_utc): """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. - :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] + :type messages: Union[~azure.servicebus.Message, List[~azure.servicebus.Message]] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime - :rtype: list[int] + :rtype: List[int] .. admonition:: Example: @@ -266,6 +266,7 @@ def from_connection_string( """Create a ServiceBusSender from a connection string. :param conn_str: The connection string of a Service Bus. + :type conn_str: str :keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided. :keyword str topic_name: The path of specific Service Bus Topic the client connects to. @@ -280,7 +281,8 @@ def from_connection_string( keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. - :rtype: ~azure.servicebus.ServiceBusSenderClient + + :rtype: ~azure.servicebus.ServiceBusSender .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py index 417abe97bf98..e69d30d2f847 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py @@ -82,13 +82,14 @@ class ServiceBusSessionReceiver(ServiceBusReceiver, SessionReceiverMixin): """ def __init__(self, fully_qualified_namespace, credential, **kwargs): + # type: (str, TokenCredential, Any) -> None super(ServiceBusSessionReceiver, self).__init__(fully_qualified_namespace, credential, **kwargs) self._populate_session_attributes(**kwargs) self._session = ServiceBusSession(self._session_id, self, self._config.encoding) @property def session(self): - # type: ()->ServiceBusSession + # type: () -> ServiceBusSession """ Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities. @@ -115,7 +116,7 @@ def from_connection_string( # type: (str, Any) -> ServiceBusSessionReceiver """Create a ServiceBusSessionReceiver from a connection string. - :param conn_str: The connection string of a Service Bus. + :param str conn_str: The connection string of a Service Bus. :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription the client connects to. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index be006a36f1c8..fbbdc7cac69f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -72,8 +72,12 @@ def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bo return False if renewable._lock_expired: return False - if not renewable._receiver._running: - return False + try: + if not renewable._receiver._running: # type: ignore + return False + except AttributeError: # If for whatever reason the renewable isn't hooked up to a receiver + raise ServiceBusError("Cannot renew an entity without an associated receiver. " + "ReceivedMessage and active ServiceBusReceiver.Session objects are expected.") return True async def _auto_lock_renew(self, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index 6bd8c7122864..2e6a66e5559a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -125,7 +125,8 @@ async def defer(self) -> None: # type: ignore await self._settle_message(MESSAGE_DEFER) self._settled = True - async def renew_lock(self) -> None: # type: ignore + async def renew_lock(self) -> None: + # pylint: disable=protected-access """Renew the message lock. This will maintain the lock on the message to ensure @@ -142,7 +143,7 @@ async def renew_lock(self) -> None: # type: ignore :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. """ try: - if self._receiver.session: # pylint: disable=protected-access + if self._receiver.session: # type: ignore raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.") except AttributeError: pass @@ -151,5 +152,5 @@ async def renew_lock(self) -> None: # type: ignore if not token: raise ValueError("Unable to renew lock - no lock token found.") - expiry = await self._receiver._renew_locks(token) # pylint: disable=protected-access + expiry = await self._receiver._renew_locks(token) # type: ignore self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index f22ed2b058b1..125574319d79 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -9,7 +9,7 @@ import uamqp from uamqp.message import MessageProperties -from .._base_handler import _generate_sas_token +from .._base_handler import _generate_sas_token, _AccessToken from .._common._configuration import Configuration from .._common.utils import create_properties from .._common.constants import ( @@ -23,7 +23,7 @@ ) if TYPE_CHECKING: - from azure.core.credentials import TokenCredential + from azure.core.credentials import TokenCredential, AccessToken _LOGGER = logging.getLogger(__name__) @@ -35,12 +35,12 @@ class ServiceBusSharedKeyCredential(object): :param str key: The shared access key. """ - def __init__(self, policy: str, key: str): + def __init__(self, policy: str, key: str) -> None: self.policy = policy self.key = key self.token_type = TOKEN_TYPE_SASTOKEN - async def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument + async def get_token(self, *scopes: str, **kwargs: Any) -> _AccessToken: # pylint:disable=unused-argument if not scopes: raise ValueError("No token scope provided.") return _generate_sas_token(scopes[0], self.policy, self.key) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 066a82b9ea58..67cd7bff710c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, TYPE_CHECKING +from typing import Any, TYPE_CHECKING, Union import uamqp @@ -98,7 +98,7 @@ def from_connection_string( """ Create a ServiceBusClient from a connection string. - :param conn_str: The connection string of a Service Bus. + :param str conn_str: The connection string of a Service Bus. :keyword str entity_name: Optional entity name, this can be the name of Queue or Topic. It must be specified if the credential is for specific Queue or Topic. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. @@ -125,7 +125,7 @@ def from_connection_string( return cls( fully_qualified_namespace=host, entity_name=entity_in_conn_str or kwargs.pop("entity_name", None), - credential=ServiceBusSharedKeyCredential(policy, key), + credential=ServiceBusSharedKeyCredential(policy, key), # type: ignore **kwargs ) @@ -431,7 +431,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** ) def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): - # type: (str, str, str, Any) -> ServiceBusReceiver + # type: (str, str, str, Any) -> ServiceBusSessionReceiver """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 571bcdc1610b..ac3a7a672f9d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -103,7 +103,7 @@ def __init__( fully_qualified_namespace: str, credential: "TokenCredential", **kwargs: Any - ): + ) -> None: self._message_iter = None # type: Optional[AsyncIterator[ReceivedMessage]] if kwargs.get("entity_name"): super(ServiceBusReceiver, self).__init__( @@ -293,6 +293,15 @@ def get_streaming_message_iter(self, max_wait_time: float = None) -> AsyncIterat timeout period, the iterator will stop. :rtype AsyncIterator[ReceivedMessage] + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus.py + :start-after: [START receive_forever_async] + :end-before: [END receive_forever_async] + :language: python + :dedent: 4 + :caption: Receive indefinitely from an iterator in streaming fashion. """ return self._IterContextualWrapper(self, max_wait_time) @@ -304,7 +313,7 @@ def from_connection_string( ) -> "ServiceBusReceiver": """Create a ServiceBusReceiver from a connection string. - :param conn_str: The connection string of a Service Bus. + :param str conn_str: The connection string of a Service Bus. :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription the client connects to. @@ -437,14 +446,15 @@ async def receive_deferred_messages(self, sequence_numbers): self._populate_message_properties(message) - handler = functools.partial(mgmt_handlers.deferred_message_op, mode=self._mode, message_type=ReceivedMessage) + handler = functools.partial(mgmt_handlers.deferred_message_op, + mode=self._mode, + message_type=ReceivedMessage, + receiver=self) messages = await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, handler ) - for m in messages: - m._receiver = self # pylint: disable=protected-access return messages async def peek_messages(self, message_count=1, sequence_number=0): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index b82d8d754e51..4171981a4599 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -75,7 +75,7 @@ def __init__( fully_qualified_namespace: str, credential: "TokenCredential", **kwargs: Any - ): + ) -> None: if kwargs.get("entity_name"): super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, @@ -208,7 +208,7 @@ def from_connection_string( ) -> "ServiceBusSender": """Create a ServiceBusSender from a connection string. - :param conn_str: The connection string of a Service Bus. + :param str conn_str: The connection string of a Service Bus. :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic the client connects to. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py index 9fe8b58c39a9..b2446d8ed411 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py @@ -73,6 +73,7 @@ async def set_session_state(self, state): :param state: The state value. :type state: str, bytes or bytearray + :rtype: None .. admonition:: Example: @@ -103,6 +104,8 @@ async def renew_lock(self): This operation can also be performed as a threaded background task by registering the session with an `azure.servicebus.aio.AutoLockRenew` instance. + :rtype: None + .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py index f56336569da9..ffc808f7b819 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py @@ -86,7 +86,7 @@ def __init__( fully_qualified_namespace: str, credential: "TokenCredential", **kwargs: Any - ): + ) -> None: super(ServiceBusSessionReceiver, self).__init__(fully_qualified_namespace, credential, **kwargs) self._populate_session_attributes(**kwargs) self._session = ServiceBusSession(self._session_id, self, self._config.encoding) @@ -99,7 +99,7 @@ def from_connection_string( ) -> "ServiceBusSessionReceiver": """Create a ServiceBusSessionReceiver from a connection string. - :param conn_str: The connection string of a Service Bus. + :param str conn_str: The connection string of a Service Bus. :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription the client connects to. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py index 2712c0076987..33aba6288654 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/management/_management_client_async.py @@ -130,7 +130,7 @@ async def _get_rule_element(self, topic_name, subscription_name, rule_name, **kw return element @classmethod - def from_connection_string(cls, conn_str: str, **kwargs) -> "ServiceBusManagementClient": + def from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusManagementClient": """Create a client from connection string. :param str conn_str: The connection string of the Service Bus Namespace. @@ -322,7 +322,7 @@ async def delete_queue(self, queue: Union[str, QueueProperties], **kwargs) -> No with _handle_response_error(): await self._impl.entity.delete(queue_name, api_version=constants.API_VERSION, **kwargs) - def list_queues(self, **kwargs) -> AsyncItemPaged[QueueProperties]: + def list_queues(self, **kwargs: Any) -> AsyncItemPaged[QueueProperties]: """List the queues of a ServiceBus namespace. :returns: An iterable (auto-paging) response of QueueProperties. @@ -342,7 +342,7 @@ def entry_to_qd(entry): return AsyncItemPaged( get_next, extract_data) - def list_queues_runtime_info(self, **kwargs) -> AsyncItemPaged[QueueRuntimeProperties]: + def list_queues_runtime_info(self, **kwargs: Any) -> AsyncItemPaged[QueueRuntimeProperties]: """List the runtime information of the queues in a ServiceBus namespace. :returns: An iterable (auto-paging) response of QueueRuntimeProperties. @@ -522,7 +522,7 @@ async def delete_topic(self, topic: Union[str, TopicProperties], **kwargs) -> No topic_name = topic await self._impl.entity.delete(topic_name, api_version=constants.API_VERSION, **kwargs) - def list_topics(self, **kwargs) -> AsyncItemPaged[TopicProperties]: + def list_topics(self, **kwargs: Any) -> AsyncItemPaged[TopicProperties]: """List the topics of a ServiceBus namespace. :returns: An iterable (auto-paging) response of TopicProperties. @@ -541,7 +541,7 @@ def entry_to_topic(entry): return AsyncItemPaged( get_next, extract_data) - def list_topics_runtime_info(self, **kwargs) -> AsyncItemPaged[TopicRuntimeProperties]: + def list_topics_runtime_info(self, **kwargs: Any) -> AsyncItemPaged[TopicRuntimeProperties]: """List the topics runtime information of a ServiceBus namespace. :returns: An iterable (auto-paging) response of TopicRuntimeProperties. @@ -753,7 +753,7 @@ async def delete_subscription( await self._impl.subscription.delete(topic_name, subscription_name, api_version=constants.API_VERSION, **kwargs) def list_subscriptions( - self, topic: Union[str, TopicProperties], **kwargs) -> AsyncItemPaged[SubscriptionProperties]: + self, topic: Union[str, TopicProperties], **kwargs: Any) -> AsyncItemPaged[SubscriptionProperties]: """List the subscriptions of a ServiceBus Topic. :param Union[str, ~azure.servicebus.management.TopicProperties] topic: The topic that owns the subscription. @@ -780,7 +780,7 @@ def entry_to_subscription(entry): get_next, extract_data) def list_subscriptions_runtime_info( - self, topic: Union[str, TopicProperties], **kwargs) -> AsyncItemPaged[SubscriptionRuntimeProperties]: + self, topic: Union[str, TopicProperties], **kwargs: Any) -> AsyncItemPaged[SubscriptionRuntimeProperties]: """List the subscriptions runtime information of a ServiceBus. :param Union[str, ~azure.servicebus.management.TopicProperties] topic: The topic that owns the subscription. @@ -960,7 +960,10 @@ async def delete_rule( topic_name, subscription_name, rule_name, api_version=constants.API_VERSION, **kwargs) def list_rules( - self, topic: Union[str, TopicProperties], subscription: Union[str, SubscriptionProperties], **kwargs + self, + topic: Union[str, TopicProperties], + subscription: Union[str, SubscriptionProperties], + **kwargs: Any ) -> AsyncItemPaged[RuleProperties]: """List the rules of a topic subscription. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py index e929b9e2b062..6daec6210d7b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py @@ -4,6 +4,8 @@ # license information. # ------------------------------------------------------------------------- +from typing import Optional + from uamqp import errors, constants from ._common.constants import SESSION_LOCK_LOST, SESSION_LOCK_TIMEOUT @@ -158,6 +160,7 @@ class ServiceBusError(Exception): """ def __init__(self, message, inner_exception=None): + # type: (Optional[str], Optional[Exception]) -> None self.inner_exception = inner_exception super(ServiceBusError, self).__init__(message) @@ -205,6 +208,7 @@ class MessageAlreadySettled(MessageError): """ def __init__(self, action): + # type: (str) -> None message = "Unable to {} message as it has already been settled".format(action) super(MessageAlreadySettled, self).__init__(message) @@ -213,6 +217,7 @@ class MessageSettleFailed(ServiceBusError): """Attempt to settle a message failed.""" def __init__(self, action, inner_exception): + # type: (str, Exception) -> None message = "Failed to {} message. Error: {}".format(action, inner_exception) self.inner_exception = inner_exception super(MessageSettleFailed, self).__init__(message, inner_exception) @@ -222,12 +227,13 @@ class MessageSendFailed(ServiceBusError): """A message failed to send to the Service Bus entity.""" def __init__(self, inner_exception): + # type: (Exception) -> None message = "Message failed to send. Error: {}".format(inner_exception) self.condition = None self.description = None if hasattr(inner_exception, 'condition'): - self.condition = inner_exception.condition - self.description = inner_exception.description + self.condition = inner_exception.condition # type: ignore + self.description = inner_exception.description # type: ignore self.inner_exception = inner_exception super(MessageSendFailed, self).__init__(message, inner_exception) @@ -240,6 +246,7 @@ class MessageLockExpired(ServiceBusError): """ def __init__(self, message=None, inner_exception=None): + # type: (Optional[str], Optional[Exception]) -> None message = message or "Message lock expired" super(MessageLockExpired, self).__init__(message, inner_exception=inner_exception) @@ -252,6 +259,7 @@ class SessionLockExpired(ServiceBusError): """ def __init__(self, message=None, inner_exception=None): + # type: (Optional[str], Optional[Exception]) -> None message = message or "Session lock expired" super(SessionLockExpired, self).__init__(message, inner_exception=inner_exception) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py index 56aaeed2e111..9c326af09dd6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_management_client.py @@ -939,7 +939,7 @@ def update_rule(self, topic, subscription, rule, **kwargs): ) def delete_rule(self, topic, subscription, rule, **kwargs): - # type: (Union[str, TopicProperties], Union[str, SubscriptionProperties], Union[str, RuleProperties], Any) -> None # pylint:disable=line-too-long + # type: (Union[str,TopicProperties], Union[str,SubscriptionProperties], Union[str,RuleProperties], Any) -> None """Delete a topic subscription rule. :param Union[str, ~azure.servicebus.management.TopicProperties] topic: The topic that owns the subscription. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py index dc21c2e1ae13..1bf01ae807a5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/management/_models.py @@ -260,7 +260,8 @@ class QueueRuntimeProperties(object): def __init__( self, ): - self._name = None + # type: () -> None + self._name = None # type: Optional[str] self._internal_qr = None # type: Optional[InternalQueueDescription] @classmethod @@ -501,7 +502,8 @@ class TopicRuntimeProperties(object): def __init__( self, ): - self._name = None + # type: () -> None + self._name = None # type: Optional[str] self._internal_td = None # type: Optional[InternalTopicDescription] @classmethod @@ -692,8 +694,9 @@ class SubscriptionRuntimeProperties(object): """ def __init__(self): + # type: () -> None self._internal_sd = None # type: Optional[InternalSubscriptionDescription] - self._name = None + self._name = None # type: Optional[str] @classmethod def _from_internal_entity(cls, name, internal_subscription): @@ -944,6 +947,7 @@ class TrueRuleFilter(SqlRuleFilter): """A sql filter with a sql expression that is always True """ def __init__(self): + # type: () -> None super(TrueRuleFilter, self).__init__("1=1", None, True) def _to_internal_entity(self): @@ -959,6 +963,7 @@ class FalseRuleFilter(SqlRuleFilter): """A sql filter with a sql expression that is always True """ def __init__(self): + # type: () -> None super(FalseRuleFilter, self).__init__("1>1", None, True) def _to_internal_entity(self): diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index e4b9fa76d014..cd336eb041c1 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -217,6 +217,13 @@ async def example_send_and_receive_async(): await message.complete() # [END receive_async] + # [START receive_forever_async] + async with servicebus_receiver: + async for message in servicebus_receiver.get_streaming_message_iter(): + print(str(message)) + await message.complete() + # [END receive_forever_async] + # [START auto_lock_renew_message_async] from azure.servicebus.aio import AutoLockRenew diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 1e6393b8f288..28626350d835 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -258,6 +258,14 @@ def example_send_and_receive_sync(): message.abandon() # [END abandon_message] + # [START receive_forever] + with servicebus_receiver: + for message in servicebus_receiver.get_streaming_message_iter(): + print(str(message)) + message.complete() + # [END receive_forever] + + def example_receive_deferred_sync(): servicebus_sender = example_create_servicebus_sender_sync() servicebus_receiver = example_create_servicebus_receiver_sync() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 5d1b27892554..a745233f5478 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1558,7 +1558,7 @@ def test_queue_message_properties(self): }, properties=uamqp.message.MessageProperties() ) - received_message = ReceivedMessage(uamqp_received_message) + received_message = ReceivedMessage(uamqp_received_message, receiver=None) assert received_message.partition_key == 'r_key' assert received_message.via_partition_key == 'r_via_key' assert received_message.scheduled_enqueue_time_utc == new_scheduled_time