Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic.
* Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic.
* `ServiceBusSender.send()` can now send a list of messages in one call, if they fit into a single batch. If they do not fit a `ValueError` is thrown.

**BugFixes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def create_queue(
:type max_delivery_count: int
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a queue of the same name already exists.
"""
queue_properties = Queue(
Expand All @@ -102,8 +102,8 @@ def delete_queue(self, queue_name, fail_not_exist=False):
found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the queue is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the queue is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -137,7 +137,7 @@ def create_topic(
:type duplicate_detection_history_time_window: ~datetime.timedelta
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a topic of the same name already exists.
"""
topic_properties = Topic(
Expand All @@ -160,8 +160,8 @@ def delete_topic(self, topic_name, fail_not_exist=False):
found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the topic is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the topic is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -203,7 +203,7 @@ def create_subscription(
:type max_delivery_count: int
:param enable_batched_operations:
:type: enable_batched_operations: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namespace is not found.
:raises: ~azure.common.AzureConflictHttpError if a queue of the same name already exists.
"""
sub_properties = Subscription(
Expand Down Expand Up @@ -232,8 +232,8 @@ def delete_subscription(self, topic_name, subscription_name, fail_not_exist=Fals
topic is not found. If set to True, a ServiceBusResourceNotFound will be raised.
Default value is False.
:type fail_not_exist: bool
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the entity is not found
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the namesapce is not found.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the entity is not found
and `fail_not_exist` is set to True.
"""
try:
Expand Down Expand Up @@ -326,8 +326,8 @@ def get_properties(self):

:returns: The properties of the entity as a dictionary.
:rtype: dict[str, Any]
:raises: ~azure.servicebus.common.errors.ServiceBusResourceNotFound if the entity does not exist.
:raises: ~azure.servicebus.common.errors.ServiceBusConnectionError if the endpoint cannot be reached.
:raises: ~azure.servicebus.exceptions.ServiceBusResourceNotFound if the entity does not exist.
:raises: ~azure.servicebus.exceptions.ServiceBusConnectionError if the endpoint cannot be reached.
:raises: ~azure.common.AzureHTTPError if the credentials are invalid.
"""
try:
Expand Down
42 changes: 24 additions & 18 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def __repr__(self):
def __len__(self):
return self._count

def _from_list(self, messages):
for each in messages:
if not isinstance(each, Message):
raise ValueError("Populating a message batch only supports iterables containing Message Objects. Received instead: {}".format(each.__class__.__name__))
self.add(each)

@property
def size_in_bytes(self):
# type: () -> int
Expand Down Expand Up @@ -599,10 +605,10 @@ def complete(self):
This removes the message from the queue.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
Expand All @@ -620,10 +626,10 @@ def dead_letter(self, reason=None, description=None):
:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
Expand All @@ -642,10 +648,10 @@ def abandon(self):
This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
Expand All @@ -660,10 +666,10 @@ def defer(self):
specifically by its sequence number in order to be processed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
Expand All @@ -682,8 +688,8 @@ def renew_lock(self):

:rtype: None
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.common.errors.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled is message has already been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,19 @@ def from_connection_string(
return cls(**constructor_args)

def send(self, message):
# type: (Union[Message, BatchMessage]) -> None
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a
`ValueError` if they cannot fit in a single batch.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage or list[~azure.servicebus.Message]
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
:raises: :class: ~azure.servicebus.exceptions.MessageSendFailed if the message fails to
send
:class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
:class: `ValueError` if list of messages is provided and cannot fit in a batch.

.. admonition:: Example:

Expand All @@ -310,6 +315,13 @@ def send(self, message):
:caption: Send message.

"""
try:
batch = self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
pass

self._do_retryable_operation(
self._send,
message=message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ async def complete(self):
This removes the message from the queue.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
Expand All @@ -83,9 +83,9 @@ async def dead_letter(self, reason=None, description=None):
:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
Expand All @@ -97,9 +97,9 @@ async def abandon(self):
"""Abandon the message. This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
Expand All @@ -111,9 +111,9 @@ async def defer(self):
"""Abandon the message. This message will be returned to the queue to be reprocessed.

:rtype: None
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.common.errors.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEFER)
Expand All @@ -133,9 +133,9 @@ async def renew_lock(self):

:rtype: None
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.common.errors.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageAlreadySettled is message has already been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session: # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ def from_connection_string(
return cls(**constructor_args)

async def send(self, message):
# type: (Union[Message, BatchMessage]) -> None
# type: (Union[Message, BatchMessage, List[Message]]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a
`ValueError` if they cannot fit in a single batch.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage or list[~azure.servicebus.Message]
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
:raises: :class: ~azure.servicebus.exceptions.MessageSendFailed if the message fails to
send
:class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out.
:class: `ValueError` if list of messages is provided and cannot fit in a batch.

.. admonition:: Example:

Expand All @@ -258,6 +263,13 @@ async def send(self, message):
:caption: Send message.

"""
try:
batch = await self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
pass

await self._do_retryable_operation(
self._send,
message=message,
Expand Down
Loading