Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ with ServiceBusClient.from_connection_string(connstr) as client:

### Receive messages from a queue

To receive from a queue, you can either perform an ad-hoc receive via "receiver.receive_messages()" or receive persistently through the receiver itself.
To receive from a queue, you can either perform an ad-hoc receive via `receiver.receive_messages()` or receive persistently through the receiver itself.

#### Receive messages from a queue through iterating over ServiceBusReceiver
#### [Receive messages from a queue through iterating over ServiceBusReceiver][streaming_receive_reference]

```Python
from azure.servicebus import ServiceBusClient
Expand All @@ -173,7 +173,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator
for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter().
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
```
Expand All @@ -183,7 +183,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
> See [AutoLockRenewer](#autolockrenew) for a helper to perform this in the background automatically.
> Lock duration is set in Azure on the queue or topic itself.

#### [Receive messages from a queue through `ServiceBusReceiver.receive_messages()`][receive_reference]
#### [Receive messages from a queue through ServiceBusReceiver.receive_messages()][receive_reference]

> **NOTE:** `ServiceBusReceiver.receive_messages()` receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.

Expand Down Expand Up @@ -446,14 +446,14 @@ contact [[email protected]](mailto:[email protected]) with any additio
[service_bus_overview]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview
[queue_status_codes]: https://docs.microsoft.com/rest/api/servicebus/create-queue#response-codes
[service_bus_docs]: https://docs.microsoft.com/azure/service-bus/
[service_bus_mgmt_docs]: https://docs.microsoft.com/en-us/python/api/overview/azure/servicebus/management?view=azure-python
[queue_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#queues
[topic_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#topics
[subscription_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions
[azure_namespace_creation]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
[service_bus_mgmt_docs]: https://docs.microsoft.com/python/api/overview/azure/servicebus/management?view=azure-python
[queue_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#queues
[topic_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messaging-overview#topics
[subscription_concept]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions
[azure_namespace_creation]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal
[servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus
[get_servicebus_conn_str]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string
[servicebus_aad_authentication]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-authentication-and-authorization
[get_servicebus_conn_str]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string
[servicebus_aad_authentication]: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-authentication-and-authorization
[token_credential_interface]: ../../core/azure-core/azure/core/credentials.py
[pypi_azure_identity]: https://pypi.org/project/azure-identity/
[message_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.Message
Expand All @@ -462,6 +462,7 @@ contact [[email protected]](mailto:[email protected]) with any additio
[client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient
[send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages
[receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter
[session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages
[session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.Message.session_id
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete#azure.servicebus.ReceivedMessage.complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class AutoLockRenew(object):
"""

def __init__(self, executor=None, max_workers=None):
# type: (ThreadPoolExecutor, int) -> None
"""Auto renew locks for messages and sessions using a background thread pool.

:param executor: A user-specified thread pool. This cannot be combined with
setting `max_workers`.
:type executor: ~concurrent.futures.ThreadPoolExecutor
:param max_workers: Specify the maximum workers in the thread pool. If not
specified the number used will be derived from the core count of the environment.
This cannot be combined with `executor`.
:type max_workers: int
"""
self._executor = executor or ThreadPoolExecutor(max_workers=max_workers)
self._shutdown = threading.Event()
self._sleep_time = 1
Expand Down Expand Up @@ -109,16 +120,18 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=
on_lock_renew_failure(renewable, error)

def register(self, renewable, timeout=300, on_lock_renew_failure=None):
# type: (Union[ReceivedMessage, ServiceBusSession], float, Optional[LockRenewFailureCallback]) -> None
"""Register a renewable entity for automatic lock renewal.

:param renewable: A locked entity that needs to be renewed.
:type renewable: ~azure.servicebus.ReceivedMessage or
~azure.servicebus.ServiceBusSession
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
:param Optional[LockRenewFailureCallback] on_lock_renew_failure:
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
Default value is None (no callback).
:type renewable: Union[~azure.servicebus.ReceivedMessage, ~azure.servicebus.ServiceBusSession]
:param timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).
:type timeout: float
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
that is being registered. Default value is None (no callback).
:type on_lock_renew_failure: Optional[LockRenewFailureCallback]

:rtype: None
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
Expand All @@ -131,6 +144,8 @@ def close(self, wait=True):

:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool

:rtype: None
"""
self._shutdown.set()
self._executor.shutdown(wait=wait)
34 changes: 22 additions & 12 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uuid
import functools
import logging
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any

import uamqp.message

Expand Down Expand Up @@ -52,7 +52,8 @@
MessageLockExpired,
SessionLockExpired,
MessageSettleFailed,
MessageContentTooLarge)
MessageContentTooLarge,
ServiceBusError)
from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed
if TYPE_CHECKING:
from .._servicebus_receiver import ServiceBusReceiver
Expand All @@ -65,7 +66,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta
"""A Service Bus Message.

:param body: The data to send in a single message.
:type body: str or bytes
:type body: Union[str, bytes]

:keyword dict properties: The user defined properties on the message.
:keyword str session_id: The session identifier of the message for a sessionful entity.
Expand Down Expand Up @@ -95,6 +96,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta
"""

def __init__(self, body, **kwargs):
# type: (Union[str, bytes], Any) -> None
# Although we might normally thread through **kwargs this causes
# problems as MessageProperties won't absorb spurious args.
self._encoding = kwargs.pop("encoding", 'UTF-8')
Expand Down Expand Up @@ -491,7 +493,6 @@ class BatchMessage(object):
:vartype message: ~uamqp.BatchMessage

:param int max_size_in_bytes: The maximum size of bytes data that a BatchMessage object can hold.

"""
def __init__(self, max_size_in_bytes=None):
# type: (Optional[int]) -> None
Expand Down Expand Up @@ -570,11 +571,11 @@ class PeekMessage(Message):
This message is still on the queue, and unlocked.
A peeked message cannot be completed, abandoned, dead-lettered or deferred.
It has no lock token or expiry.

"""

def __init__(self, message):
super(PeekMessage, self).__init__(None, message=message)
# type: (uamqp.message.Message) -> None
super(PeekMessage, self).__init__(None, message=message) # type: ignore

def _to_outgoing_message(self):
# type: () -> Message
Expand Down Expand Up @@ -741,12 +742,17 @@ class ReceivedMessageBase(PeekMessage):
"""

def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
# type: (uamqp.message.Message, ReceiveSettleMode, Any) -> None
super(ReceivedMessageBase, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None
self._receiver = None # type: ignore
self.auto_renew_error = None # type: Optional[Exception]
try:
self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver]
except KeyError:
raise TypeError("ReceivedMessage requires a receiver to be initialized. This class should never be" + \
"initialized by a user; the Message class should be utilized instead.")
self._expiry = None

def _check_live(self, action):
Expand All @@ -769,6 +775,7 @@ def _check_live(self, action):
def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
# type: (str, Optional[str], Optional[str]) -> Callable
# pylint: disable=protected-access

if settle_operation == MESSAGE_COMPLETE:
return functools.partial(
self._receiver._settle_message,
Expand Down Expand Up @@ -822,13 +829,14 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d
@property
def _lock_expired(self):
# type: () -> bool
# pylint: disable=protected-access
"""
Whether the lock on the message has expired.

:rtype: bool
"""
try:
if self._receiver.session: # pylint: disable=protected-access
if self._receiver.session: # type: ignore
raise TypeError("Session messages do not expire. Please use the Session expiry instead.")
except AttributeError: # Is not a session receiver
pass
Expand Down Expand Up @@ -859,6 +867,7 @@ def lock_token(self):
@property
def locked_until_utc(self):
# type: () -> Optional[datetime.datetime]
# pylint: disable=protected-access
"""
The UTC datetime until which the message will be locked in the queue/subscription.
When the lock expires, delivery count of hte message is incremented and the message
Expand All @@ -867,7 +876,7 @@ def locked_until_utc(self):
:rtype: datetime.datetime
"""
try:
if self._settled or self._receiver.session: # pylint: disable=protected-access
if self._settled or self._receiver.session: # type: ignore
return None
except AttributeError: # not settled, and isn't session receiver.
pass
Expand Down Expand Up @@ -1021,6 +1030,7 @@ def defer(self):

def renew_lock(self):
# type: () -> None
# pylint: disable=protected-access,no-member
"""Renew the message lock.

This will maintain the lock on the message to ensure it is not returned to the queue
Expand All @@ -1041,7 +1051,7 @@ def renew_lock(self):
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session:
if self._receiver.session: # type: ignore
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
except AttributeError:
pass
Expand All @@ -1050,5 +1060,5 @@ def renew_lock(self):
if not token:
raise ValueError("Unable to renew lock - no lock token found.")

expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access,no-member
expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ def deferred_message_op(
status_code,
message,
description,
receiver,
mode=ReceiveSettleMode.PeekLock,
message_type=ReceivedMessage
):
if status_code == 200:
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
parsed.append(message_type(wrapped, mode, is_deferred_message=True, receiver=receiver))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ def _populate_attributes(self, **kwargs):
self._max_wait_time = kwargs.get("max_wait_time", None)

def _build_message(self, received, message_type=ReceivedMessage):
message = message_type(message=received, mode=self._mode)
message._receiver = self # pylint: disable=protected-access
message = message_type(message=received, mode=self._mode, receiver=self)
self._last_received_sequenced_number = message.sequence_number
return message

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
)

def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
# type: (str, str, str, Any) -> ServiceBusReceiver
# type: (str, str, str, Any) -> ServiceBusSessionReceiver
"""Get ServiceBusReceiver for the specific subscription under the topic.

:param str topic_name: The name of specific Service Bus Topic the client connects to.
Expand Down
Loading