Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic.
* Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic.
* `ServiceBusSender.send()` can now send a list of messages in one call, if they fit into a single batch. If they do not fit a `ValueError` is thrown.

**BugFixes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def create_queue(
:type max_delivery_count: int
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a queue of the same name already exists.
"""
queue_properties = Queue(
Expand All @@ -102,8 +102,8 @@ def delete_queue(self, queue_name, fail_not_exist=False):
found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the queue is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the queue is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -137,7 +137,7 @@ def create_topic(
:type duplicate_detection_history_time_window: ~datetime.timedelta
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a topic of the same name already exists.
"""
topic_properties = Topic(
Expand All @@ -160,8 +160,8 @@ def delete_topic(self, topic_name, fail_not_exist=False):
found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the topic is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the topic is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -203,7 +203,7 @@ def create_subscription(
:type max_delivery_count: int
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a queue of the same name already exists.
"""
sub_properties = Subscription(
Expand Down Expand Up @@ -232,8 +232,8 @@ def delete_subscription(self, topic_name, subscription_name, fail_not_exist=Fals
topic is not found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the entity is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the entity is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -326,8 +326,8 @@ def get_properties(self):

:returns: The properties of the entity as a dictionary.
:rtype: dict[str, Any]
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the entity does not exist.
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the endpoint cannot be reached.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the entity does not exist.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the endpoint cannot be reached.
:raises: ~azure.common.AzureHTTPError if the credentials are invalid.
"""
try:
Expand Down
42 changes: 24 additions & 18 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def __repr__(self):
def __len__(self):
return self._count

def _from_list(self, messages):
for each in messages:
if not isinstance(each, Message):
raise ValueError("Populating a message batch only supports iterables containing Message Objects. Received instead: {}".format(each.__class__.__name__))
self.add(each)

@property
def size_in_bytes(self):
# type: () -> int
Expand Down Expand Up @@ -599,10 +605,10 @@ def complete(self):
This removes the message from the queue.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
Expand All @@ -620,10 +626,10 @@ def dead_letter(self, reason=None, description=None):
:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
Expand All @@ -642,10 +648,10 @@ def abandon(self):
This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
Expand All @@ -660,10 +666,10 @@ def defer(self):
specifically by its sequence number in order to be processed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
Expand All @@ -682,8 +688,8 @@ def renew_lock(self):

:rtype: None
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.common.errors.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled is message has already been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,19 @@ def from_connection_string(
return cls(**constructor_args)

def send(self, message):
# type: (Union[Message, BatchMessage]) -> None
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a
`ValueError` if they cannot fit in a single batch.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
:raises: :class: ~azure.servicebus.exceptions.MessageSendFailed if the message fails to
send
:class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
:class: `ValueError` if list of messages is provided and cannot fit in a batch.

.. admonition:: Example:

Expand All @@ -310,6 +315,13 @@ def send(self, message):
:caption: Send message.

"""
try:
batch = self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
pass

self._do_retryable_operation(
self._send,
message=message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ async def complete(self):
This removes the message from the queue.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
Expand All @@ -83,9 +83,9 @@ async def dead_letter(self, reason=None, description=None):
:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
Expand All @@ -97,9 +97,9 @@ async def abandon(self):
"""Abandon the message. This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
Expand All @@ -111,9 +111,9 @@ async def defer(self):
"""Abandon the message. This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEFER)
Expand All @@ -133,9 +133,9 @@ async def renew_lock(self):

:rtype: None
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.common.errors.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled is message has already been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session: # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ def from_connection_string(
return cls(**constructor_args)

async def send(self, message):
# type: (Union[Message, BatchMessage]) -> None
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a
`ValueError` if they cannot fit in a single batch.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
:raises: :class: ~azure.servicebus.exceptions.MessageSendFailed if the message fails to
send
:class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
:class: `ValueError` if list of messages is provided and cannot fit in a batch.

.. admonition:: Example:

Expand All @@ -258,6 +263,13 @@ async def send(self, message):
:caption: Send message.

"""
try:
batch = await self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
pass

await self._do_retryable_operation(
self._send,
message=message,
Expand Down
Loading