diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index b973abffa6e4..45256ed7f68d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -6,10 +6,12 @@ import datetime import uuid +import functools +import logging from typing import Optional, List, Union, Generator import uamqp -from uamqp import types +from uamqp import types, errors from .constants import ( _BATCH_MESSAGE_OVERHEAD_COST, @@ -34,7 +36,8 @@ MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, MESSAGE_DEFER, - MESSAGE_RENEW_LOCK + MESSAGE_RENEW_LOCK, + DEADLETTERNAME ) from ..exceptions import ( MessageAlreadySettled, @@ -44,6 +47,8 @@ ) from .utils import utc_from_timestamp, utc_now +_LOGGER = logging.getLogger(__name__) + class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage): :dedent: 4 :caption: Checking the properties on a received message. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessage, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) + self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None def _is_live(self, action): @@ -458,6 +464,69 @@ def _is_live(self, action): except TypeError: pass + def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + self._settle_via_receiver_link(settle_operation, dead_letter_details)() + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) + + def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None): + # pylint: disable=protected-access + if settle_operation == MESSAGE_COMPLETE: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_COMPLETE, + [self.lock_token], + ) + if settle_operation == MESSAGE_ABANDON: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_ABANDON, + [self.lock_token], + ) + if settle_operation == MESSAGE_DEAD_LETTER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEADLETTER, + [self.lock_token], + dead_letter_details=dead_letter_details + ) + if settle_operation == MESSAGE_DEFER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEFER, + [self.lock_token], + ) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + + def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): + if settle_operation == MESSAGE_COMPLETE: + return functools.partial(self.message.accept) + if settle_operation == MESSAGE_ABANDON: + return functools.partial(self.message.modify, True, False) + if settle_operation == MESSAGE_DEAD_LETTER: + # note: message.reject() can not set reason and description properly due to the issue + # https://github.com/Azure/azure-uamqp-python/issues/155 + return functools.partial(self.message.reject, condition=DEADLETTERNAME) + if settle_operation == MESSAGE_DEFER: + return functools.partial(self.message.modify, True, True) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + @property def settled(self): # type: () -> bool @@ -529,11 +598,9 @@ def complete(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + self._settle_message(MESSAGE_COMPLETE) self._settled = True def dead_letter(self, reason=None, description=None): @@ -554,17 +621,12 @@ def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) + details = { MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + + self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details) self._settled = True def abandon(self): @@ -579,11 +641,9 @@ def abandon(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + self._settle_message(MESSAGE_ABANDON) self._settled = True def defer(self): @@ -600,10 +660,7 @@ def defer(self): :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live(MESSAGE_DEFER) - try: - self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + self._settle_message(MESSAGE_DEFER) self._settled = True def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py index b42d2cd955cb..48ebdfe156ec 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py @@ -69,7 +69,7 @@ def deferred_message_op( parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) - parsed.append(message_type(wrapped, mode)) + parsed.append(message_type(wrapped, mode, is_deferred_message=True)) return parsed if status_code in [202, 204]: return [] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index fbbeb290421c..39886d8c8de3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -3,18 +3,13 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- +import logging from typing import Optional from .._common import message as sync_message from .._common.constants import ( - SETTLEMENT_ABANDON, - SETTLEMENT_COMPLETE, - SETTLEMENT_DEFER, - SETTLEMENT_DEADLETTER, ReceiveSettleMode, MGMT_RESPONSE_MESSAGE_EXPIRATION, - MGMT_REQUEST_DEAD_LETTER_REASON, - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION, MESSAGE_COMPLETE, MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, @@ -24,15 +19,41 @@ from .._common.utils import get_running_loop, utc_from_timestamp from ..exceptions import MessageSettleFailed +_LOGGER = logging.getLogger(__name__) + class ReceivedMessage(sync_message.ReceivedMessage): """A Service Bus Message received from service side. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs): self._loop = loop or get_running_loop() - super(ReceivedMessage, self).__init__(message=message, mode=mode) + super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs) + + async def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + await self._loop.run_in_executor( + None, + self._settle_via_receiver_link(settle_operation, dead_letter_details) + ) + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + await self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) async def complete(self): # type: () -> None @@ -48,10 +69,7 @@ async def complete(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + await self._settle_message(MESSAGE_COMPLETE) self._settled = True async def dead_letter(self, reason=None, description=None): @@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) - details = { - MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - await self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + await self._settle_message(MESSAGE_DEAD_LETTER) self._settled = True async def abandon(self): @@ -95,10 +103,7 @@ async def abandon(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + await self._settle_message(MESSAGE_ABANDON) self._settled = True async def defer(self): @@ -112,10 +117,7 @@ async def defer(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEFER) - try: - await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + await self._settle_message(MESSAGE_DEFER) self._settled = True async def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index d57a8b8c489d..1d6bbf6d6bee 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1076,3 +1076,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + await sender.send(message) + + async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = await receiver.receive(max_wait_time=5) + await receiver._handler.message_handler.destroy_async() # destroy the underlying receiver link + assert len(messages) == 1 + await messages[0].complete() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 61b818438692..590666d580bf 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1201,3 +1201,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + sender.send(message) + + with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = receiver.receive(max_wait_time=5) + receiver._handler.message_handler.destroy() # destroy the underlying receiver link + assert len(messages) == 1 + messages[0].complete()