diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 13a9cb2a79e4..f424295345a0 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,6 +2,29 @@ ## 7.0.0b5 (Unreleased) +**New Features** + +* Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`, +`message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information. + +* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`, +`dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information. + +**Breaking Changes** + +* Removed/Renamed several properties and instance variables on Message (the changes applied to the inherited Message type PeekMessage and ReceivedMessage). + - Renamed property `user_properties` to `properties` + - The original instance variable `properties` which represents the AMQP properties now becomes an internal instance variable `_amqp_properties`. + - Removed property `enqueue_sequence_number`. + - Removed property `annotations`. + - Removed instance variable `header`. + +* Removed several properties and instance variables on PeekMessage and ReceivedMessage. + - Removed proeprty `partition_id` on both type. + - Removed instance variable `received_timestamp_utc` on both type. + - Removed property `settled` on `PeekMessage`. + - Removed property `expired` on `ReceivedMessage`. + ## 7.0.0b4 (2020-07-06) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py index 7b67fe37a32b..00182a290432 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py @@ -3,10 +3,9 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- - from enum import Enum -from uamqp import constants +from uamqp import constants, types VENDOR = b"com.microsoft" DATETIMEOFFSET_EPOCH = 621355968000000000 @@ -97,6 +96,10 @@ _X_OPT_LOCKED_UNTIL = b'x-opt-locked-until' _X_OPT_LOCK_TOKEN = b'x-opt-lock-token' _X_OPT_SCHEDULED_ENQUEUE_TIME = b'x-opt-scheduled-enqueue-time' +_X_OPT_DEAD_LETTER_SOURCE = b'x-opt-deadletter-source' + +PROPERTIES_DEAD_LETTER_REASON = b'DeadLetterReason' +PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION = b'DeadLetterErrorDescription' DEAD_LETTER_QUEUE_SUFFIX = '/$DeadLetterQueue' @@ -112,4 +115,15 @@ class SessionFilter(Enum): NextAvailable = 0 +ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY) +ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY) +ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME) + +ANNOTATION_SYMBOL_KEY_MAP = { + _X_OPT_PARTITION_KEY: ANNOTATION_SYMBOL_PARTITION_KEY, + _X_OPT_VIA_PARTITION_KEY: ANNOTATION_SYMBOL_VIA_PARTITION_KEY, + _X_OPT_SCHEDULED_ENQUEUE_TIME: ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME +} + + NEXT_AVAILABLE = SessionFilter.NextAvailable diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 80361f35e506..18c0e6bc7c18 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -1,17 +1,17 @@ -# ------------------------------------------------------------------------ +# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- +# pylint: disable=too-many-lines import datetime import uuid import functools import logging -from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Dict, Any +from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable import uamqp.message -from uamqp import types from .constants import ( _BATCH_MESSAGE_OVERHEAD_COST, @@ -23,12 +23,12 @@ _X_OPT_ENQUEUED_TIME, _X_OPT_SEQUENCE_NUMBER, _X_OPT_ENQUEUE_SEQUENCE_NUMBER, - _X_OPT_PARTITION_ID, _X_OPT_PARTITION_KEY, _X_OPT_VIA_PARTITION_KEY, _X_OPT_LOCKED_UNTIL, _X_OPT_LOCK_TOKEN, _X_OPT_SCHEDULED_ENQUEUE_TIME, + _X_OPT_DEAD_LETTER_SOURCE, MGMT_RESPONSE_MESSAGE_EXPIRATION, MGMT_REQUEST_DEAD_LETTER_REASON, MGMT_REQUEST_DEAD_LETTER_DESCRIPTION, @@ -39,7 +39,13 @@ MESSAGE_ABANDON, MESSAGE_DEFER, MESSAGE_RENEW_LOCK, - DEADLETTERNAME + DEADLETTERNAME, + PROPERTIES_DEAD_LETTER_REASON, + PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION, + ANNOTATION_SYMBOL_PARTITION_KEY, + ANNOTATION_SYMBOL_VIA_PARTITION_KEY, + ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME, + ANNOTATION_SYMBOL_KEY_MAP ) from ..exceptions import ( MessageAlreadySettled, @@ -58,17 +64,24 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. - :ivar properties: Properties of the internal AMQP message object. - :vartype properties: ~uamqp.message.MessageProperties - :ivar header: Header of the internal AMQP message object. - :vartype header: ~uamqp.message.MessageHeader - :ivar message: Internal AMQP message object. - :vartype message: ~uamqp.message.Message - :param body: The data to send in a single message. :type body: str or bytes + + :keyword dict properties: The user defined properties on the message. + :keyword str session_id: The session identifier of the message for a sessionful entity. + :keyword str message_id: The id to identify the message. + :keyword datetime.datetime scheduled_enqueue_time_utc: The utc scheduled enqueue time to the message. + :keyword datetime.timedelta time_to_live: The life duration of a message. + :keyword str content_type: The content type descriptor. + :keyword str correlation_id: The correlation identifier. + :keyword str label: The application specific label. + :keyword str partition_key: The partition key for sending a message to a partitioned entity. + :keyword str via_partition_key: The partition key for sending a message into an entity via a partitioned + transfer queue. + :keyword str to: The `to` address used for auto_forward chaining scenarios. + :keyword str reply_to: The address of an entity to send replies to. + :keyword str reply_to_session_id: The session identifier augmenting the `reply_to` address. :keyword str encoding: The encoding for string data. Default is UTF-8. - :keyword str session_id: An optional session ID for the message to be sent. .. admonition:: Example: @@ -82,170 +95,190 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta """ def __init__(self, body, **kwargs): - subject = kwargs.pop('subject', None) # Although we might normally thread through **kwargs this causes # problems as MessageProperties won't absorb spurious args. self._encoding = kwargs.pop("encoding", 'UTF-8') - self.properties = uamqp.message.MessageProperties(encoding=self._encoding, subject=subject) - self.header = uamqp.message.MessageHeader() - self._annotations = {} - self._app_properties = {} + self._amqp_properties = uamqp.message.MessageProperties(encoding=self._encoding) + self._amqp_header = uamqp.message.MessageHeader() - self.session_id = kwargs.get("session_id", None) if 'message' in kwargs: self.message = kwargs['message'] - self._annotations = self.message.annotations - self._app_properties = self.message.application_properties - self.properties = self.message.properties - self.header = self.message.header + self._amqp_properties = self.message.properties + self._amqp_header = self.message.header else: self._build_message(body) + self.properties = kwargs.pop("properties", None) + self.session_id = kwargs.pop("session_id", None) + self.message_id = kwargs.get("message_id", None) + self.content_type = kwargs.pop("content_type", None) + self.correlation_id = kwargs.pop("correlation_id", None) + self.to = kwargs.pop("to", None) + self.reply_to = kwargs.pop("reply_to", None) + self.reply_to_session_id = kwargs.pop("reply_to_session_id", None) + self.label = kwargs.pop("label", None) + self.scheduled_enqueue_time_utc = kwargs.pop("scheduled_enqueue_time_utc", None) + self.time_to_live = kwargs.pop("time_to_live", None) + self.partition_key = kwargs.pop("partition_key", None) + self.via_partition_key = kwargs.pop("via_partition_key", None) def __str__(self): return str(self.message) def _build_message(self, body): if isinstance(body, list) and body: # TODO: This only works for a list of bytes/strings - self.message = uamqp.Message(body[0], properties=self.properties, header=self.header) + self.message = uamqp.Message(body[0], properties=self._amqp_properties, header=self._amqp_header) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("Message body cannot be None.") else: - self.message = uamqp.Message(body, properties=self.properties, header=self.header) + self.message = uamqp.Message(body, properties=self._amqp_properties, header=self._amqp_header) + + def _set_message_annotations(self, key, value): + if not self.message.annotations: + self.message.annotations = {} + + if isinstance(self, ReceivedMessage): + try: + del self.message.annotations[key] + except KeyError: + pass + + if value is None: + try: + del self.message.annotations[ANNOTATION_SYMBOL_KEY_MAP[key]] + except KeyError: + pass + else: + self.message.annotations[ANNOTATION_SYMBOL_KEY_MAP[key]] = value @property def session_id(self): # type: () -> str - """The session id of the message + """The session identifier of the message for a sessionful entity. + + For sessionful entities, this application-defined value specifies the session affiliation of the message. + Messages with the same session identifier are subject to summary locking and enable exact in-order + processing and demultiplexing. For non-sessionful entities, this value is ignored. + + See Message Sessions in `https://docs.microsoft.com/azure/service-bus-messaging/message-sessions`. :rtype: str """ try: - return self.properties.group_id.decode('UTF-8') + return self._amqp_properties.group_id.decode('UTF-8') except (AttributeError, UnicodeDecodeError): - return self.properties.group_id + return self._amqp_properties.group_id @session_id.setter def session_id(self, value): - """Set the session id on the message. - - :param value: The session id for the message. - :type value: str - """ - self.properties.group_id = value - - @property - def annotations(self): - # type: () -> dict - """The annotations of the message. - - :rtype: dict - """ - return self.message.annotations - - @annotations.setter - def annotations(self, value): - """Set the annotations on the message. - - :param value: The annotations for the Message. - :type value: dict - """ - self.message.annotations = value + # type: (str) -> None + self._amqp_properties.group_id = value @property - def user_properties(self): + def properties(self): # type: () -> dict - """User defined properties on the message. + """The user defined properties on the message. :rtype: dict """ return self.message.application_properties - @user_properties.setter - def user_properties(self, value): - """User defined properties on the message. - - :param value: The application properties for the Message. - :type value: dict - """ + @properties.setter + def properties(self, value): + # type: (dict) -> None self.message.application_properties = value - @property - def enqueue_sequence_number(self): - # type: () -> Optional[int] - """ - - :rtype: int - """ - if self.message.annotations: - return self.message.annotations.get(_X_OPT_ENQUEUE_SEQUENCE_NUMBER) - return None - - @enqueue_sequence_number.setter - def enqueue_sequence_number(self, value): - if not self.message.annotations: - self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_ENQUEUE_SEQUENCE_NUMBER)] = value - @property def partition_key(self): # type: () -> Optional[str] - """ + """ The partition key for sending a message to a partitioned entity. + + Setting this value enables assigning related messages to the same internal partition, so that submission + sequence order is correctly recorded. + The partition is chosen by a hash function over this value and cannot be chosen directly. + + See Partitioned queues and topics in + `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning`. :rtype: str """ - if self.message.annotations: - return self.message.annotations.get(_X_OPT_PARTITION_KEY) - return None + p_key = None + try: + p_key = self.message.annotations.get(_X_OPT_PARTITION_KEY) or \ + self.message.annotations.get(ANNOTATION_SYMBOL_PARTITION_KEY) + return p_key.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return p_key @partition_key.setter def partition_key(self, value): - if not self.message.annotations: - self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_PARTITION_KEY)] = value + # type: (str) -> None + self._set_message_annotations(_X_OPT_PARTITION_KEY, value) @property def via_partition_key(self): # type: () -> Optional[str] - """ + """ The partition key for sending a message into an entity via a partitioned transfer queue. + + If a message is sent via a transfer queue in the scope of a transaction, this value selects the transfer + queue partition: This is functionally equivalent to `partition_key` and ensures that messages are kept + together and in order as they are transferred. + + See Transfers and Send Via in + `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-transactions#transfers-and-send-via`. :rtype: str """ - if self.message.annotations: - return self.message.annotations.get(_X_OPT_VIA_PARTITION_KEY) - return None + via_p_key = None + try: + via_p_key = self.message.annotations.get(_X_OPT_VIA_PARTITION_KEY) or \ + self.message.annotations.get(ANNOTATION_SYMBOL_VIA_PARTITION_KEY) + return via_p_key.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return via_p_key @via_partition_key.setter def via_partition_key(self, value): - if not self.message.annotations: - self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)] = value + # type: (str) -> None + self._set_message_annotations(_X_OPT_VIA_PARTITION_KEY, value) @property def time_to_live(self): # type: () -> Optional[datetime.timedelta] - """ + """The life duration of a message. + + This value is the relative duration after which the message expires, starting from the instant the message + has been accepted and stored by the broker, as captured in `enqueued_time_utc`. + When not set explicitly, the assumed value is the DefaultTimeToLive for the respective queue or topic. + A message-level time-to-live value cannot be longer than the entity's time-to-live setting and it is silently + adjusted if it does. + + See Expiration in `https://docs.microsoft.com/azure/service-bus-messaging/message-expiration` :rtype: ~datetime.timedelta """ - if self.header and self.header.time_to_live: - return datetime.timedelta(milliseconds=self.header.time_to_live) + if self._amqp_header and self._amqp_header.time_to_live: + return datetime.timedelta(milliseconds=self._amqp_header.time_to_live) return None @time_to_live.setter def time_to_live(self, value): - if not self.header: - self.header = uamqp.message.MessageHeader() - if isinstance(value, datetime.timedelta): - self.header.time_to_live = value.seconds * 1000 + # type: (datetime.timedelta) -> None + if not self._amqp_header: + self._amqp_header = uamqp.message.MessageHeader() + if value is None: + self._amqp_header.time_to_live = value + elif isinstance(value, datetime.timedelta): + self._amqp_header.time_to_live = value.seconds * 1000 else: - self.header.time_to_live = int(value) * 1000 + self._amqp_header.time_to_live = int(value) * 1000 @property def scheduled_enqueue_time_utc(self): # type: () -> Optional[datetime.datetime] - """Get or set the utc scheduled enqueue time to the message. + """The utc scheduled enqueue time to the message. + This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, which returns sequence numbers that can be used for future cancellation. @@ -254,20 +287,22 @@ def scheduled_enqueue_time_utc(self): :rtype: ~datetime.datetime """ if self.message.annotations: - timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) + timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) or \ + self.message.annotations.get(ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME) if timestamp: - in_seconds = timestamp/1000.0 - return utc_from_timestamp(in_seconds) + try: + in_seconds = timestamp/1000.0 + return utc_from_timestamp(in_seconds) + except TypeError: + return timestamp return None @scheduled_enqueue_time_utc.setter def scheduled_enqueue_time_utc(self, value): # type: (datetime.datetime) -> None - if not self.properties.message_id: - self.properties.message_id = str(uuid.uuid4()) - if not self.message.annotations: - self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = value + if not self._amqp_properties.message_id: + self._amqp_properties.message_id = str(uuid.uuid4()) + self._set_message_annotations(_X_OPT_SCHEDULED_ENQUEUE_TIME, value) @property def body(self): @@ -278,6 +313,165 @@ def body(self): """ return self.message.get_data() + @property + def content_type(self): + # type: () -> str + """The content type descriptor. + + Optionally describes the payload of the message, with a descriptor following the format of RFC2045, Section 5, + for example "application/json". + + :rtype: str + """ + try: + return self._amqp_properties.content_type.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.content_type + + @content_type.setter + def content_type(self, val): + # type: (str) -> None + self._amqp_properties.content_type = val + + @property + def correlation_id(self): + # type: () -> str + # pylint: disable=line-too-long + """The correlation identifier. + + Allows an application to specify a context for the message for the purposes of correlation, for example + reflecting the MessageId of a message that is being replied to. + + See Message Routing and Correlation in + `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. + + :rtype: str + """ + try: + return self._amqp_properties.correlation_id.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.correlation_id + + @correlation_id.setter + def correlation_id(self, val): + # type: (str) -> None + self._amqp_properties.correlation_id = val + + @property + def label(self): + # type: () -> str + """The application specific label. + + This property enables the application to indicate the purpose of the message to the receiver in a standardized + fashion, similar to an email subject line. + + :rtype: str + """ + try: + return self._amqp_properties.subject.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.subject + + @label.setter + def label(self, val): + # type: (str) -> None + self._amqp_properties.subject = val + + @property + def message_id(self): + # type: () -> str + """The id to identify the message. + + The message identifier is an application-defined value that uniquely identifies the message and its payload. + The identifier is a free-form string and can reflect a GUID or an identifier derived from the + application context. If enabled, the duplicate detection (see + `https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection`) + feature identifies and removes second and further submissions of messages with the same message id. + + :rtype: str + """ + try: + return self._amqp_properties.message_id.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.message_id + + @message_id.setter + def message_id(self, val): + # type: (str) -> None + self._amqp_properties.message_id = val + + @property + def reply_to(self): + # type: () -> str + # pylint: disable=line-too-long + """The address of an entity to send replies to. + + This optional and application-defined value is a standard way to express a reply path to the receiver of + the message. When a sender expects a reply, it sets the value to the absolute or relative path of the queue + or topic it expects the reply to be sent to. + + See Message Routing and Correlation in + `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. + + :rtype: str + """ + try: + return self._amqp_properties.reply_to.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.reply_to + + @reply_to.setter + def reply_to(self, val): + # type: (str) -> None + self._amqp_properties.reply_to = val + + @property + def reply_to_session_id(self): + # type: () -> str + # pylint: disable=line-too-long + """The session identifier augmenting the `reply_to` address. + + This value augments the `reply_to` information and specifies which session id should be set for the reply + when sent to the reply entity. + + See Message Routing and Correlation in + `https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation`. + + :rtype: str + """ + try: + return self._amqp_properties.reply_to_group_id.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.reply_to_group_id + + @reply_to_session_id.setter + def reply_to_session_id(self, val): + # type: (str) -> None + self._amqp_properties.reply_to_group_id = val + + @property + def to(self): + # type: () -> str + """The `to` address. + + This property is reserved for future use in routing scenarios and presently ignored by the broker itself. + Applications can use this value in rule-driven auto-forward chaining scenarios to indicate the intended + logical destination of the message. + + See https://docs.microsoft.com/azure/service-bus-messaging/service-bus-auto-forwarding for more details. + + :rtype: str + """ + try: + return self._amqp_properties.to.decode('UTF-8') + except (AttributeError, UnicodeDecodeError): + return self._amqp_properties.to + + @to.setter + def to(self, val): + # type: (str) -> None + self._amqp_properties.to = val + class BatchMessage(object): """A batch of messages. @@ -376,42 +570,89 @@ class PeekMessage(Message): A peeked message cannot be completed, abandoned, dead-lettered or deferred. It has no lock token or expiry. - :ivar received_timestamp_utc: The utc timestamp of when the message is received. - :vartype received_timestamp_utc: datetime.datetime - """ def __init__(self, message): super(PeekMessage, self).__init__(None, message=message) - self.received_timestamp_utc = utc_now() @property - def settled(self): - # type: () -> bool - """Whether the message has been settled. + def dead_letter_error_description(self): + # type: () -> Optional[str] + """ + Dead letter error description, when the message is received from a deadletter subqueue of an entity. - This will aways be `True` for a message received using ReceiveAndDelete mode, - otherwise it will be `False` until the message is completed or otherwise settled. + :rtype: str + """ + if self.message.application_properties: + try: + return self.message.application_properties.get(PROPERTIES_DEAD_LETTER_ERROR_DESCRIPTION).decode('UTF-8') + except AttributeError: + pass + return None - :rtype: bool + @property + def dead_letter_reason(self): + # type: () -> Optional[str] """ - return self.message.settled + Dead letter reason, when the message is received from a deadletter subqueue of an entity. + + :rtype: str + """ + if self.message.application_properties: + try: + return self.message.application_properties.get(PROPERTIES_DEAD_LETTER_REASON).decode('UTF-8') + except AttributeError: + pass + return None @property - def partition_id(self): + def dead_letter_source(self): # type: () -> Optional[str] """ + The name of the queue or subscription that this message was enqueued on, before it was deadlettered. + This property is only set in messages that have been dead-lettered and subsequently auto-forwarded + from the dead-letter queue to another entity. Indicates the entity in which the message was dead-lettered. + + :rtype: str + """ + if self.message.annotations: + try: + return self.message.annotations.get(_X_OPT_DEAD_LETTER_SOURCE).decode('UTF-8') + except AttributeError: + pass + return None + + @property + def delivery_count(self): + # type: () -> Optional[int] + """ + Number of deliveries that have been attempted for this message. The count is incremented + when a message lock expires or the message is explicitly abandoned by the receiver. + + :rtype: int + """ + if self._amqp_header: + return self._amqp_header.delivery_count + return None + + @property + def enqueued_sequence_number(self): + # type: () -> Optional[int] + """ + For messages that have been auto-forwarded, this property reflects the sequence number that had + first been assigned to the message at its original point of submission. :rtype: int """ if self.message.annotations: - return self.message.annotations.get(_X_OPT_PARTITION_ID) + return self.message.annotations.get(_X_OPT_ENQUEUE_SEQUENCE_NUMBER) return None @property def enqueued_time_utc(self): # type: () -> Optional[datetime.datetime] """ + The UTC datetime at which the message has been accepted and stored in the entity. :rtype: ~datetime.datetime """ @@ -422,10 +663,28 @@ def enqueued_time_utc(self): return utc_from_timestamp(in_seconds) return None + @property + def expires_at_utc(self): + # type: () -> Optional[datetime.datetime] + """ + The UTC datetime at which the message is marked for removal and no longer available for retrieval + from the entity due to expiration. Expiry is controlled by the `Message.time_to_live` property. + This property is computed from `Message.enqueued_time_utc` + `Message.time_to_live`. + + :rtype: ~datetime.datetime + """ + if self.enqueued_time_utc and self.time_to_live: + return self.enqueued_time_utc + self.time_to_live + return None + @property def sequence_number(self): # type: () -> Optional[int] """ + The unique number assigned to a message by Service Bus. The sequence number is a unique 64-bit integer + assigned to a message as it is accepted and stored by the broker and functions as its true identifier. + For partitioned entities, the topmost 16 bits reflect the partition identifier. + Sequence numbers monotonically increase. They roll over to 0 when the 48-64 bit range is exhausted. :rtype: int """ @@ -454,89 +713,25 @@ class ReceivedMessageBase(PeekMessage): def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessageBase, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) + self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None self._receiver = None # type: ignore self._expiry = None - @property - def settled(self): - # type: () -> bool - """Whether the message has been settled. - - This will aways be `True` for a message received using ReceiveAndDelete mode, - otherwise it will be `False` until the message is completed or otherwise settled. - - :rtype: bool - """ - return self._settled - - @property - def expired(self): - # type: () -> bool - """ - - :rtype: bool - """ - try: - if self._receiver.session: # pylint: disable=protected-access - raise TypeError("Session messages do not expire. Please use the Session expiry instead.") - except AttributeError: # Is not a session receiver - pass - if self.locked_until_utc and self.locked_until_utc <= utc_now(): - return True - return False - - @property - def locked_until_utc(self): - # type: () -> Optional[datetime.datetime] - """ - - :rtype: datetime.datetime - """ - try: - if self.settled or self._receiver.session: # pylint: disable=protected-access - return None - except AttributeError: # not settled, and isn't session receiver. - pass - if self._expiry: - return self._expiry - if self.message.annotations and _X_OPT_LOCKED_UNTIL in self.message.annotations: - expiry_in_seconds = self.message.annotations[_X_OPT_LOCKED_UNTIL]/1000 - self._expiry = utc_from_timestamp(expiry_in_seconds) - return self._expiry - - @property - def lock_token(self): - # type: () -> Optional[Union[uuid.UUID, str]] - """ - - :rtype: ~uuid.UUID or str - """ - if self.settled: - return None - - if self.message.delivery_tag: - return uuid.UUID(bytes_le=self.message.delivery_tag) - - delivery_annotations = self.message.delivery_annotations - if delivery_annotations: - return delivery_annotations.get(_X_OPT_LOCK_TOKEN) - return None - def _check_live(self, action): # pylint: disable=no-member if not self._receiver or not self._receiver._running: # pylint: disable=protected-access raise MessageSettleFailed(action, "Orphan message had no open connection.") - if self.settled: + if self._settled: raise MessageAlreadySettled(action) try: - if self.expired: + if self._lock_expired: raise MessageLockExpired(inner_exception=self.auto_renew_error) except TypeError: pass try: - if self._receiver.session.expired: + if self._receiver.session._lock_expired: # pylint: disable=protected-access raise SessionLockExpired(inner_exception=self._receiver.session.auto_renew_error) except AttributeError: pass @@ -594,6 +789,75 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d return functools.partial(self.message.modify, True, True) raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + @property + def _lock_expired(self): + # type: () -> bool + """ + Whether the lock on the message has expired. + + :rtype: bool + """ + try: + if self._receiver.session: # pylint: disable=protected-access + raise TypeError("Session messages do not expire. Please use the Session expiry instead.") + except AttributeError: # Is not a session receiver + pass + if self.locked_until_utc and self.locked_until_utc <= utc_now(): + return True + return False + + @property + def lock_token(self): + # type: () -> Optional[Union[uuid.UUID, str]] + """ + The lock token for the current message serving as a reference to the lock that + is being held by the broker in PeekLock mode. + + :rtype: ~uuid.UUID or str + """ + if self._settled: + return None + + if self.message.delivery_tag: + return uuid.UUID(bytes_le=self.message.delivery_tag) + + delivery_annotations = self.message.delivery_annotations + if delivery_annotations: + return delivery_annotations.get(_X_OPT_LOCK_TOKEN) + return None + + @property + def locked_until_utc(self): + # type: () -> Optional[datetime.datetime] + """ + The UTC datetime until which the message will be locked in the queue/subscription. + When the lock expires, delivery count of hte message is incremented and the message + is again available for retrieval. + + :rtype: datetime.datetime + """ + try: + if self._settled or self._receiver.session: # pylint: disable=protected-access + return None + except AttributeError: # not settled, and isn't session receiver. + pass + if self._expiry: + return self._expiry + if self.message.annotations and _X_OPT_LOCKED_UNTIL in self.message.annotations: + expiry_in_seconds = self.message.annotations[_X_OPT_LOCKED_UNTIL]/1000 + self._expiry = utc_from_timestamp(expiry_in_seconds) + return self._expiry + + @property + def settled(self): + # type: () -> bool + """Whether the message has been settled. + This will aways be `True` for a message received using ReceiveAndDelete mode, + otherwise it will be `False` until the message is completed or otherwise settled. + :rtype: bool + """ + return self._settled + class ReceivedMessage(ReceivedMessageBase): def _settle_message( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py index 842160c2a79b..f879ac5a8fd4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py @@ -90,7 +90,7 @@ def _on_attach(self, source, target, properties, error): # pylint: disable=unus self._session._session_id = self._session_id def _check_live(self): - if self._session and self._session.expired: + if self._session and self._session._lock_expired: # pylint: disable=protected-access raise SessionLockExpired(inner_exception=self._session.auto_renew_error) def _populate_session_attributes(self, **kwargs): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 11ee8e0292b9..e6d047e8ad37 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -104,7 +104,7 @@ def create_properties(): def renewable_start_time(renewable): try: - return renewable.received_timestamp_utc + return renewable._received_timestamp_utc # pylint: disable=protected-access except AttributeError: pass try: @@ -205,9 +205,9 @@ def __exit__(self, *args): def _renewable(self, renewable): if self._shutdown.is_set(): return False - if hasattr(renewable, 'settled') and renewable.settled: + if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access return False - if renewable.expired: + if renewable._lock_expired: # pylint: disable=protected-access return False return True diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 96d5442d741b..f5714bd8a1ae 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -70,9 +70,9 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): " Received instead: {}".format(message.__class__.__name__)) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} - message_data[MGMT_REQUEST_MESSAGE_ID] = message.properties.message_id - if message.properties.group_id: - message_data[MGMT_REQUEST_SESSION_ID] = message.properties.group_id + message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id + if message.session_id: + message_data[MGMT_REQUEST_SESSION_ID] = message.session_id if message.partition_key: message_data[MGMT_REQUEST_PARTITION_KEY] = message.partition_key if message.via_partition_key: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py index c8b254eccefe..1e991efcb85b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py @@ -37,6 +37,19 @@ def __init__(self, session_id, receiver, encoding="UTF-8"): self._locked_until_utc = None # type: Optional[datetime.datetime] self.auto_renew_error = None + def _check_live(self): + if self._lock_expired: + raise SessionLockExpired(inner_exception=self.auto_renew_error) + + @property + def _lock_expired(self): + # type: () -> bool + """Whether the receivers lock on a particular session has expired. + + :rtype: bool + """ + return bool(self._locked_until_utc and self._locked_until_utc <= utc_now()) + @property def session_id(self): # type: () -> str @@ -47,15 +60,6 @@ def session_id(self): """ return self._session_id - @property - def expired(self): - # type: () -> bool - """Whether the receivers lock on a particular session has expired. - - :rtype: bool - """ - return bool(self._locked_until_utc and self._locked_until_utc <= utc_now()) - @property def locked_until_utc(self): # type: () -> Optional[datetime.datetime] @@ -65,10 +69,6 @@ def locked_until_utc(self): """ return self._locked_until_utc - def _check_live(self): - if self.expired: - raise SessionLockExpired(inner_exception=self.auto_renew_error) - class ServiceBusSession(BaseSession): """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index b1f1050d1754..b963ad2394db 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -118,9 +118,9 @@ async def __aexit__(self, *args): def _renewable(self, renewable): if self._shutdown.is_set(): return False - if hasattr(renewable, 'settled') and renewable.settled: + if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access return False - if renewable.expired: + if renewable._lock_expired: # pylint: disable=protected-access return False return True diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py index 9dca5337a921..043e4ccf9a28 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py @@ -27,10 +27,9 @@ async def message_processing(servicebus_client, queue_name): await receiver.session.set_session_state("OPEN") async for message in receiver: print("Message: {}".format(message)) - print("Time to live: {}".format(message.header.time_to_live)) + print("Time to live: {}".format(message.time_to_live)) print("Sequence number: {}".format(message.sequence_number)) - print("Enqueue Sequence number: {}".format(message.enqueue_sequence_number)) - print("Partition ID: {}".format(message.partition_id)) + print("Enqueue Sequence number: {}".format(message.enqueued_sequence_number)) print("Partition Key: {}".format(message.partition_key)) print("Locked until: {}".format(message.locked_until_utc)) print("Lock Token: {}".format(message.lock_token)) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 19df1c3ff818..ffa3a68f7f78 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -199,11 +199,14 @@ def example_send_and_receive_sync(): # [END create_batch_sync] # [START send_complex_message] - message = Message("Hello World!!") - message.session_id = "MySessionID" - message.partition_key = "UsingSpecificPartition" - message.user_properties = {'data': 'custom_data'} - message.time_to_live = datetime.timedelta(seconds=30) + message = Message( + "Hello World!!", + session_id="MySessionID", + partition_key="UsingSpecificPartition", + user_properties={'data': 'custom_data'}, + time_to_live=datetime.timedelta(seconds=30), + label='MyLabel' + ) # [END send_complex_message] # [START peek_messages_sync] @@ -239,13 +242,11 @@ def example_send_and_receive_sync(): print("Receiving: {}".format(message)) print("Time to live: {}".format(message.time_to_live)) print("Sequence number: {}".format(message.sequence_number)) - print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) - print("Partition ID: {}".format(message.partition_id)) + print("Enqueued Sequence numger: {}".format(message.enqueued_sequence_number)) print("Partition Key: {}".format(message.partition_key)) - print("User Properties: {}".format(message.user_properties)) - print("Annotations: {}".format(message.annotations)) - print("Delivery count: {}".format(message.header.delivery_count)) - print("Message ID: {}".format(message.properties.message_id)) + print("Properties: {}".format(message.properties)) + print("Delivery count: {}".format(message.delivery_count)) + print("Message ID: {}".format(message.message_id)) print("Locked until: {}".format(message.locked_until_utc)) print("Lock Token: {}".format(message.lock_token)) print("Enqueued time: {}".format(message.enqueued_time_utc)) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py index e64705189aa2..26ddcb75cb72 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py @@ -26,10 +26,9 @@ def message_processing(sb_client, queue_name, messages): for message in receiver: messages.append(message) print("Message: {}".format(message)) - print("Time to live: {}".format(message.header.time_to_live)) + print("Time to live: {}".format(message.time_to_live)) print("Sequence number: {}".format(message.sequence_number)) - print("Enqueue Sequence number: {}".format(message.enqueue_sequence_number)) - print("Partition ID: {}".format(message.partition_id)) + print("Enqueue Sequence number: {}".format(message.enqueued_sequence_number)) print("Partition Key: {}".format(message.partition_key)) print("Locked until: {}".format(message.locked_until_utc)) print("Lock Token: {}".format(message.lock_token)) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index c4e05ea3675a..3a0968ab5368 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -51,7 +51,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel async with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(10): message = Message("Handler message no. {}".format(i)) - message.enqueue_sequence_number = i await sender.send_messages(message) with pytest.raises(ServiceBusConnectionError): @@ -129,7 +128,7 @@ async def test_github_issue_6178_async(self, servicebus_namespace_connection_str _logger.debug(message) _logger.debug(message.sequence_number) _logger.debug(message.enqueued_time_utc) - _logger.debug(message.expired) + _logger.debug(message._lock_expired) await message.complete() await asyncio.sleep(40) @@ -146,7 +145,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_receiveandde async with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(10): message = Message("Handler message no. {}".format(i)) - message.enqueue_sequence_number = i await sender.send_messages(message) messages = [] @@ -252,11 +250,11 @@ async def test_async_queue_by_servicebus_conn_str_client_iter_messages_with_aban count = 0 async for message in receiver: print_message(_logger, message) - if not message.header.delivery_count: + if not message.delivery_count: count += 1 await message.abandon() else: - assert message.header.delivery_count == 1 + assert message.delivery_count == 1 await message.complete() assert count == 10 @@ -407,8 +405,10 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe async for message in receiver: count += 1 print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' await message.complete() assert count == 10 @@ -521,8 +521,10 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_deadletter(se async for message in dl_receiver: await message.complete() count += 1 - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' assert count == 10 @pytest.mark.liveTest @@ -560,8 +562,10 @@ async def test_async_queue_by_servicebus_client_receive_batch_with_retrieve_dead count = 0 async for message in receiver: print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' await message.complete() count += 1 assert count == 10 @@ -665,7 +669,7 @@ async def test_async_queue_by_servicebus_client_renew_message_locks(self, servic try: with pytest.raises(AttributeError): - assert not message.expired + assert not message._lock_expired for m in messages: time.sleep(5) initial_expiry = m.locked_until_utc @@ -698,29 +702,29 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_with_autoloc async for message in receiver: if not messages: messages.append(message) - assert not message.expired + assert not message._lock_expired renewer.register(message, timeout=60) print("Registered lock renew thread", message.locked_until_utc, utc_now()) await asyncio.sleep(60) print("Finished first sleep", message.locked_until_utc) - assert not message.expired + assert not message._lock_expired await asyncio.sleep(15) #generate autolockrenewtimeout error by going one iteration past. sleep_until_expired(message) print("Finished second sleep", message.locked_until_utc, utc_now()) - assert message.expired + assert message._lock_expired try: await message.complete() raise AssertionError("Didn't raise MessageLockExpired") except MessageLockExpired as e: assert isinstance(e.inner_exception, AutoLockRenewTimeout) else: - if message.expired: + if message._lock_expired: print("Remaining messages", message.locked_until_utc, utc_now()) - assert message.expired + assert message._lock_expired with pytest.raises(MessageLockExpired): await message.complete() else: - assert message.header.delivery_count >= 1 + assert message.delivery_count >= 1 print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) await message.complete() @@ -789,14 +793,14 @@ async def test_async_queue_message_duplicate_detection(self, servicebus_namespac async with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(5): message = Message(str(i)) - message.properties.message_id = message_id + message.message_id = message_id await sender.send_messages(message) async with sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5) as receiver: count = 0 async for message in receiver: print_message(_logger, message) - assert message.properties.message_id == message_id + assert message.message_id == message_id await message.complete() count += 1 assert count == 1 @@ -840,7 +844,7 @@ async def test_async_queue_message_expiry(self, servicebus_namespace_connection_ messages = await receiver.receive_messages(max_wait_time=10) assert len(messages) == 1 time.sleep(60) - assert messages[0].expired + assert messages[0]._lock_expired with pytest.raises(MessageLockExpired): await messages[0].complete() with pytest.raises(MessageLockExpired): @@ -850,7 +854,7 @@ async def test_async_queue_message_expiry(self, servicebus_namespace_connection_ messages = await receiver.receive_messages(max_wait_time=30) assert len(messages) == 1 print_message(_logger, messages[0]) - assert messages[0].header.delivery_count > 0 + assert messages[0].delivery_count > 0 await messages[0].complete() @pytest.mark.liveTest @@ -875,7 +879,7 @@ async def test_async_queue_message_lock_renew(self, servicebus_namespace_connect time.sleep(15) await messages[0].renew_lock() time.sleep(15) - assert not messages[0].expired + assert not messages[0]._lock_expired await messages[0].complete() async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: @@ -960,7 +964,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content) - message.properties.message_id = message_id + message.message_id = message_id message.scheduled_enqueue_time_utc = enqueue_time await sender.send_messages(message) @@ -969,7 +973,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio try: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id == message_id + assert messages[0].message_id == message_id assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 @@ -994,10 +998,10 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace content = str(uuid.uuid4()) message_id_a = uuid.uuid4() message_a = Message(content) - message_a.properties.message_id = message_id_a + message_a.message_id = message_id_a message_id_b = uuid.uuid4() message_b = Message(content) - message_b.properties.message_id = message_id_b + message_b.message_id = message_id_b tokens = await sender.schedule_messages([message_a, message_b], enqueue_time) assert len(tokens) == 2 @@ -1009,7 +1013,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace try: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id in (message_id_a, message_id_b) + assert messages[0].message_id in (message_id_a, message_id_b) assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 2 @@ -1107,8 +1111,8 @@ async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_lin async def test_async_queue_mock_no_reusing_auto_lock_renew(self): class MockReceivedMessage: def __init__(self): - self.received_timestamp_utc = utc_now() - self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=10) + self._received_timestamp_utc = utc_now() + self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=10) async def renew_lock(self): self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 5645fa58e5a5..450ec473f338 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -255,8 +255,10 @@ async def test_async_session_by_servicebus_client_iter_messages_with_retrieve_de async for message in receiver: count += 1 print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' await message.complete() assert count == 10 @@ -355,8 +357,10 @@ async def test_async_session_by_servicebus_client_fetch_next_with_retrieve_deadl count = 0 async for message in session: print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' await message.complete() count += 1 assert count == 10 @@ -447,7 +451,7 @@ async def test_async_session_by_servicebus_client_renew_client_locks(self, servi try: for m in messages: with pytest.raises(TypeError): - expired = m.expired + expired = m._lock_expired assert m.locked_until_utc is None assert m.lock_token is not None time.sleep(5) @@ -487,9 +491,9 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self if not messages: await asyncio.sleep(45) print("First sleep {}".format(session.session.locked_until_utc - utc_now())) - assert not session.session.expired + assert not session.session._lock_expired with pytest.raises(TypeError): - message.expired + message._lock_expired assert message.locked_until_utc is None with pytest.raises(TypeError): await message.renew_lock() @@ -500,7 +504,7 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self elif len(messages) == 1: await asyncio.sleep(45) print("Second sleep {}".format(session.session.locked_until_utc - utc_now())) - assert session.session.expired + assert session.session._lock_expired assert isinstance(session.session.auto_renew_error, AutoLockRenewTimeout) try: await message.complete() @@ -559,10 +563,10 @@ async def test_async_session_message_expiry(self, servicebus_namespace_connectio print_message(_logger, messages[0]) await asyncio.sleep(60) #TODO: Was 30, but then lock isn't expired. with pytest.raises(TypeError): - messages[0].expired + messages[0]._lock_expired with pytest.raises(TypeError): await messages[0].renew_lock() - assert receiver.session.expired + assert receiver.session._lock_expired with pytest.raises(SessionLockExpired): await messages[0].complete() with pytest.raises(SessionLockExpired): @@ -572,7 +576,7 @@ async def test_async_session_message_expiry(self, servicebus_namespace_connectio messages = await receiver.receive_messages(max_wait_time=30) assert len(messages) == 1 print_message(_logger, messages[0]) - #assert messages[0].header.delivery_count # TODO confirm this with service + assert messages[0].delivery_count await messages[0].complete() @@ -591,7 +595,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content, session_id=session_id) - message.properties.message_id = message_id + message.message_id = message_id message.scheduled_enqueue_time_utc = enqueue_time await sender.send_messages(message) @@ -604,7 +608,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect if messages: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id == message_id + assert messages[0].message_id == message_id assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 @@ -629,10 +633,10 @@ async def test_async_session_schedule_multiple_messages(self, servicebus_namespa content = str(uuid.uuid4()) message_id_a = uuid.uuid4() message_a = Message(content, session_id=session_id) - message_a.properties.message_id = message_id_a + message_a.message_id = message_id_a message_id_b = uuid.uuid4() message_b = Message(content, session_id=session_id) - message_b.properties.message_id = message_id_b + message_b.message_id = message_id_b tokens = await sender.schedule_messages([message_a, message_b], enqueue_time) assert len(tokens) == 2 @@ -644,7 +648,7 @@ async def test_async_session_schedule_multiple_messages(self, servicebus_namespa if messages: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id in (message_id_a, message_id_b) + assert messages[0].message_id in (message_id_a, message_id_b) assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 2 @@ -707,7 +711,7 @@ async def test_async_session_get_set_state_with_receiver(self, servicebus_namesp await session.session.set_session_state("first_state") count = 0 async for m in session: - assert m.properties.group_id == session_id.encode('utf-8') + assert m.session_id == session_id count += 1 await session.session.get_session_state() assert count == 3 diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py index 70bff4a022f6..70dcb43b4e0a 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py @@ -148,6 +148,8 @@ async def test_topic_by_servicebus_client_receive_batch_with_deadletter(self, se async for message in dl_receiver: await message.complete() count += 1 - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' assert count == 10 diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index f13062f01c0e..1185bfb5136c 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -11,10 +11,18 @@ import time import uuid from datetime import datetime, timedelta +import calendar +import uamqp from azure.servicebus import ServiceBusClient, AutoLockRenew, TransportType from azure.servicebus._common.message import Message, PeekMessage, ReceivedMessage, BatchMessage -from azure.servicebus._common.constants import ReceiveSettleMode, _X_OPT_LOCK_TOKEN +from azure.servicebus._common.constants import ( + ReceiveSettleMode, + _X_OPT_LOCK_TOKEN, + _X_OPT_PARTITION_KEY, + _X_OPT_VIA_PARTITION_KEY, + _X_OPT_SCHEDULED_ENQUEUE_TIME +) from azure.servicebus._common.utils import utc_now from azure.servicebus.exceptions import ( ServiceBusConnectionError, @@ -82,7 +90,7 @@ def test_github_issue_6178(self, servicebus_namespace_connection_string, service _logger.debug(message) _logger.debug(message.sequence_number) _logger.debug(message.enqueued_time_utc) - _logger.debug(message.expired) + _logger.debug(message._lock_expired) message.complete() time.sleep(40) @@ -99,16 +107,41 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(10): message = Message("Handler message no. {}".format(i)) - message.enqueue_sequence_number = i + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.message_id = str(i) + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' sender.send_messages(message) receiver = sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5) count = 0 for message in receiver: print_message(_logger, message) + assert message.delivery_count == 0 + assert message.properties + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == str(count) + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.sequence_number + assert message.enqueued_time_utc assert message.message.delivery_tag is not None assert message.lock_token == message.message.delivery_annotations.get(_X_OPT_LOCK_TOKEN) assert message.lock_token == uuid.UUID(bytes_le=message.message.delivery_tag) + assert not message.scheduled_enqueue_time_utc + assert not message.time_to_live + assert not message.session_id + assert not message.reply_to_session_id count += 1 message.complete() @@ -127,6 +160,15 @@ def test_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace messages = [] for i in range(10): message = Message("Handler message no. {}".format(i)) + message.partition_key = 'pkey' + message.via_partition_key = 'vpkey' + message.time_to_live = timedelta(seconds=60) + message.scheduled_enqueue_time_utc = utc_now() + timedelta(seconds=60) + message.partition_key = None + message.via_partition_key = None + message.time_to_live = None + message.scheduled_enqueue_time_utc = None + message.session_id = None messages.append(message) sender.send_messages(messages) @@ -134,6 +176,19 @@ def test_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace count = 0 for message in receiver: print_message(_logger, message) + assert message.delivery_count == 0 + assert not message.properties + assert not message.label + assert not message.content_type + assert not message.correlation_id + assert not message.partition_key + assert not message.via_partition_key + assert not message.to + assert not message.reply_to + assert not message.scheduled_enqueue_time_utc + assert not message.time_to_live + assert not message.session_id + assert not message.reply_to_session_id count += 1 message.complete() @@ -152,7 +207,6 @@ def test_queue_by_queue_client_conn_str_receive_handler_receiveanddelete(self, s with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(10): message = Message("Handler message no. {}".format(i)) - message.enqueue_sequence_number = i sender.send_messages(message) messages = [] @@ -160,6 +214,18 @@ def test_queue_by_queue_client_conn_str_receive_handler_receiveanddelete(self, s mode=ReceiveSettleMode.ReceiveAndDelete, idle_timeout=5) as receiver: for message in receiver: + assert not message.properties + assert not message.label + assert not message.content_type + assert not message.correlation_id + assert not message.partition_key + assert not message.via_partition_key + assert not message.to + assert not message.reply_to + assert not message.scheduled_enqueue_time_utc + assert not message.time_to_live + assert not message.session_id + assert not message.reply_to_session_id messages.append(message) with pytest.raises(MessageAlreadySettled): message.complete() @@ -269,11 +335,11 @@ def test_queue_by_servicebus_conn_str_client_iter_messages_with_abandon(self, se count = 0 for message in receiver: print_message(_logger, message) - if not message.header.delivery_count: + if not message.delivery_count: count += 1 message.abandon() else: - assert message.header.delivery_count == 1 + assert message.delivery_count == 1 message.complete() assert count == 10 @@ -446,8 +512,10 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_receive for message in receiver: count += 1 print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' message.complete() assert count == 10 @@ -570,8 +638,10 @@ def test_queue_by_servicebus_client_receive_batch_with_deadletter(self, serviceb for message in dl_receiver: message.complete() count += 1 - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' assert count == 10 @pytest.mark.liveTest @@ -614,8 +684,10 @@ def test_queue_by_servicebus_client_receive_batch_with_retrieve_deadletter(self, count = 0 for message in dl_receiver: print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' message.complete() count += 1 assert count == 10 @@ -757,7 +829,7 @@ def test_queue_by_servicebus_client_renew_message_locks(self, servicebus_namespa try: for m in messages: - assert not m.expired + assert not m._lock_expired time.sleep(5) initial_expiry = m.locked_until_utc m.renew_lock() @@ -794,29 +866,29 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_autolockrenew(self, for message in receiver: if not messages: messages.append(message) - assert not message.expired + assert not message._lock_expired renewer.register(message, timeout=60) print("Registered lock renew thread", message.locked_until_utc, utc_now()) time.sleep(60) print("Finished first sleep", message.locked_until_utc) - assert not message.expired + assert not message._lock_expired time.sleep(15) #generate autolockrenewtimeout error by going one iteration past. sleep_until_expired(message) print("Finished second sleep", message.locked_until_utc, utc_now()) - assert message.expired + assert message._lock_expired try: message.complete() raise AssertionError("Didn't raise MessageLockExpired") except MessageLockExpired as e: assert isinstance(e.inner_exception, AutoLockRenewTimeout) else: - if message.expired: + if message._lock_expired: print("Remaining messages", message.locked_until_utc, utc_now()) - assert message.expired + assert message._lock_expired with pytest.raises(MessageLockExpired): message.complete() else: - assert message.header.delivery_count >= 1 + assert message.delivery_count >= 1 print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) message.complete() @@ -871,7 +943,7 @@ def test_queue_message_duplicate_detection(self, servicebus_namespace_connection with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(5): message = Message(str(i)) - message.properties.message_id = message_id + message.message_id = message_id sender.send_messages(message) with sb_client.get_queue_receiver(servicebus_queue.name, @@ -879,7 +951,7 @@ def test_queue_message_duplicate_detection(self, servicebus_namespace_connection count = 0 for message in receiver: print_message(_logger, message) - assert message.properties.message_id == message_id + assert message.message_id == message_id message.complete() count += 1 assert count == 1 @@ -927,7 +999,7 @@ def test_queue_message_expiry(self, servicebus_namespace_connection_string, serv messages = receiver.receive_messages(max_wait_time=10) assert len(messages) == 1 time.sleep((messages[0].locked_until_utc - utc_now()).total_seconds()+1) - assert messages[0].expired + assert messages[0]._lock_expired with pytest.raises(MessageLockExpired): messages[0].complete() with pytest.raises(MessageLockExpired): @@ -937,7 +1009,7 @@ def test_queue_message_expiry(self, servicebus_namespace_connection_string, serv messages = receiver.receive_messages(max_wait_time=30) assert len(messages) == 1 print_message(_logger, messages[0]) - assert messages[0].header.delivery_count > 0 + assert messages[0].delivery_count > 0 messages[0].complete() @@ -964,7 +1036,7 @@ def test_queue_message_lock_renew(self, servicebus_namespace_connection_string, time.sleep(15) messages[0].renew_lock() time.sleep(15) - assert not messages[0].expired + assert not messages[0]._lock_expired messages[0].complete() with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: @@ -1024,9 +1096,20 @@ def test_queue_message_batch(self, servicebus_namespace_connection_string, servi def message_content(): for i in range(5): - yield Message("Message no. {}".format(i)) - - + message = Message("Message no. {}".format(i)) + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.message_id = str(i) + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' + message.time_to_live = timedelta(seconds=60) + + yield message + with sb_client.get_queue_sender(servicebus_queue.name) as sender: message = BatchMessage() for each in message_content(): @@ -1041,9 +1124,25 @@ def message_content(): messages.extend(recv) assert len(messages) == 5 - for m in messages: - print_message(_logger, m) - m.complete() + count = 0 + for message in messages: + assert message.delivery_count == 0 + assert message.properties + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == str(count) + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.sequence_number + assert message.enqueued_time_utc + assert message.expires_at_utc == (message.enqueued_time_utc + timedelta(seconds=60)) + print_message(_logger, message) + message.complete() + count += 1 @pytest.mark.liveTest @@ -1062,7 +1161,7 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content) - message.properties.message_id = message_id + message.message_id = message_id message.scheduled_enqueue_time_utc = enqueue_time sender.send_messages(message) @@ -1071,7 +1170,7 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se try: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id == message_id + assert messages[0].message_id == message_id assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 @@ -1099,11 +1198,21 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ content = str(uuid.uuid4()) message_id_a = uuid.uuid4() message_a = Message(content) - message_a.properties.message_id = message_id_a + message_a.message_id = message_id_a message_id_b = uuid.uuid4() message_b = Message(content) - message_b.properties.message_id = message_id_b - tokens = sender.schedule_messages([message_a, message_b], enqueue_time) + message_b.message_id = message_id_b + message_arry = [message_a, message_b] + for message in message_arry: + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' + tokens = sender.schedule_messages(message_arry, enqueue_time) assert len(tokens) == 2 messages = receiver.receive_messages(max_wait_time=120) @@ -1112,9 +1221,22 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ try: data = str(messages[0]) assert data == content - assert messages[0].properties.message_id in (message_id_a, message_id_b) + assert messages[0].message_id in (message_id_a, message_id_b) assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) + assert messages[0].delivery_count == 0 + assert messages[0].properties + assert messages[0].properties[b'key'] == b'value' + assert messages[0].label == 'label' + assert messages[0].content_type == 'application/text' + assert messages[0].correlation_id == 'cid' + assert messages[0].partition_key == 'pk' + assert messages[0].via_partition_key == 'via_pk' + assert messages[0].to == 'to' + assert messages[0].reply_to == 'reply_to' + assert messages[0].sequence_number + assert messages[0].enqueued_time_utc + assert messages[0].message.delivery_tag is not None assert len(messages) == 2 finally: for m in messages: @@ -1218,8 +1340,8 @@ def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self def test_queue_mock_no_reusing_auto_lock_renew(self): class MockReceivedMessage: def __init__(self): - self.received_timestamp_utc = utc_now() - self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=10) + self._received_timestamp_utc = utc_now() + self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=10) def renew_lock(self): self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) @@ -1251,6 +1373,91 @@ def renew_lock(self): with pytest.raises(ServiceBusError): auto_lock_renew.register(renewable=MockReceivedMessage()) + def test_queue_message_properties(self): + scheduled_enqueue_time = (utc_now() + timedelta(seconds=20)).replace(microsecond=0) + message = Message( + body='data', + properties={'key': 'value'}, + session_id='sid', + label='label', + content_type='application/text', + correlation_id='cid', + message_id='mid', + partition_key='pk', + via_partition_key='via_pk', + to='to', + reply_to='reply_to', + reply_to_session_id='reply_to_sid', + scheduled_enqueue_time_utc=scheduled_enqueue_time + ) + + assert message.properties + assert message.properties['key'] == 'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.session_id == 'sid' + assert message.reply_to_session_id == 'reply_to_sid' + assert message.scheduled_enqueue_time_utc == scheduled_enqueue_time + + message.partition_key = 'updated' + message.via_partition_key = 'updated' + new_scheduled_time = (utc_now() + timedelta(hours=5)).replace(microsecond=0) + message.scheduled_enqueue_time_utc = new_scheduled_time + assert message.partition_key == 'updated' + assert message.via_partition_key == 'updated' + assert message.scheduled_enqueue_time_utc == new_scheduled_time + + message.partition_key = None + message.via_partition_key = None + message.scheduled_enqueue_time_utc = None + + assert message.partition_key is None + assert message.via_partition_key is None + assert message.scheduled_enqueue_time_utc is None + + try: + timestamp = new_scheduled_time.timestamp() * 1000 + except AttributeError: + timestamp = calendar.timegm(new_scheduled_time.timetuple()) * 1000 + + uamqp_received_message = uamqp.message.Message( + body=b'data', + annotations={ + _X_OPT_PARTITION_KEY: b'r_key', + _X_OPT_VIA_PARTITION_KEY: b'r_via_key', + _X_OPT_SCHEDULED_ENQUEUE_TIME: timestamp, + }, + properties=uamqp.message.MessageProperties() + ) + received_message = ReceivedMessage(uamqp_received_message) + assert received_message.partition_key == 'r_key' + assert received_message.via_partition_key == 'r_via_key' + assert received_message.scheduled_enqueue_time_utc == new_scheduled_time + + new_scheduled_time = utc_now() + timedelta(hours=1, minutes=49, seconds=32) + + received_message.partition_key = 'new_r_key' + received_message.via_partition_key = 'new_r_via_key' + received_message.scheduled_enqueue_time_utc = new_scheduled_time + + assert received_message.partition_key == 'new_r_key' + assert received_message.via_partition_key == 'new_r_via_key' + assert received_message.scheduled_enqueue_time_utc == new_scheduled_time + + received_message.partition_key = None + received_message.via_partition_key = None + received_message.scheduled_enqueue_time_utc = None + + assert message.partition_key is None + assert message.via_partition_key is None + assert message.scheduled_enqueue_time_utc is None + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index dba7fcb8165a..d72721d8c182 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -53,7 +53,18 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi session_id = str(uuid.uuid4()) with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(3): - message = Message("Handler message no. {}".format(i), session_id=session_id) + message = Message("Handler message no. {}".format(i)) + message.session_id = session_id + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.message_id = str(i) + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' + message.reply_to_session_id = 'reply_to_session_id' sender.send_messages(message) with pytest.raises(ServiceBusConnectionError): @@ -63,7 +74,21 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi count = 0 for message in session: print_message(_logger, message) + assert message.delivery_count == 0 + assert message.properties + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == str(count) + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.sequence_number + assert message.enqueued_time_utc assert message.session_id == session_id + assert message.reply_to_session_id == 'reply_to_session_id' count += 1 message.complete() @@ -303,8 +328,10 @@ def test_session_by_servicebus_client_iter_messages_with_retrieve_deferred_recei for message in receiver: count += 1 print_message(_logger, message) - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' message.complete() assert count == 10 @@ -415,8 +442,10 @@ def test_session_by_servicebus_client_receive_with_retrieve_deadletter(self, ser for message in session: print_message(_logger, message) message.complete() - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' count += 1 assert count == 10 @@ -510,7 +539,7 @@ def test_session_by_servicebus_client_renew_client_locks(self, servicebus_namesp try: for m in messages: with pytest.raises(TypeError): - expired = m.expired + expired = m._lock_expired assert m.locked_until_utc is None assert m.lock_token is not None time.sleep(5) @@ -557,9 +586,9 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus print("Starting first sleep") time.sleep(40) print("First sleep {}".format(receiver.session._locked_until_utc - utc_now())) - assert not receiver.session.expired + assert not receiver.session._lock_expired with pytest.raises(TypeError): - message.expired + message._lock_expired assert message.locked_until_utc is None with pytest.raises(TypeError): message.renew_lock() @@ -572,7 +601,7 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus time.sleep(40) # ensure renewer expires print("Second sleep {}".format(receiver.session._locked_until_utc - utc_now())) sleep_until_expired(receiver.session) # and then ensure it didn't slip a renew under the wire. - assert receiver.session.expired + assert receiver.session._lock_expired assert isinstance(receiver.session.auto_renew_error, AutoLockRenewTimeout) try: message.complete() @@ -632,11 +661,11 @@ def test_session_message_expiry(self, servicebus_namespace_connection_string, se print_message(_logger, messages[0]) time.sleep(60) with pytest.raises(TypeError): - messages[0].expired + messages[0]._lock_expired with pytest.raises(TypeError): messages[0].renew_lock() #TODO: Bug: Why was this 30s sleep before? compare with T1. - assert receiver.session.expired + assert receiver.session._lock_expired with pytest.raises(SessionLockExpired): messages[0].complete() with pytest.raises(SessionLockExpired): @@ -646,7 +675,7 @@ def test_session_message_expiry(self, servicebus_namespace_connection_string, se messages = receiver.receive_messages(max_wait_time=30) assert len(messages) == 1 print_message(_logger, messages[0]) - #assert messages[0].header.delivery_count # TODO confirm this with service + assert messages[0].delivery_count messages[0].complete() @@ -667,7 +696,7 @@ def test_session_schedule_message(self, servicebus_namespace_connection_string, content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content, session_id=session_id) - message.properties.message_id = message_id + message.message_id = message_id message.scheduled_enqueue_time_utc = enqueue_time sender.send_messages(message) @@ -680,7 +709,7 @@ def test_session_schedule_message(self, servicebus_namespace_connection_string, data = str(messages[0]) assert data == content - assert messages[0].properties.message_id == message_id + assert messages[0].message_id == message_id assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 @@ -704,10 +733,10 @@ def test_session_schedule_multiple_messages(self, servicebus_namespace_connectio content = str(uuid.uuid4()) message_id_a = uuid.uuid4() message_a = Message(content, session_id=session_id) - message_a.properties.message_id = message_id_a + message_a.message_id = message_id_a message_id_b = uuid.uuid4() message_b = Message(content, session_id=session_id) - message_b.properties.message_id = message_id_b + message_b.message_id = message_id_b tokens = sender.schedule_messages([message_a, message_b], enqueue_time) assert len(tokens) == 2 @@ -721,7 +750,7 @@ def test_session_schedule_multiple_messages(self, servicebus_namespace_connectio data = str(messages[0]) assert data == content - assert messages[0].properties.message_id in (message_id_a, message_id_b) + assert messages[0].message_id in (message_id_a, message_id_b) assert messages[0].scheduled_enqueue_time_utc == enqueue_time assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 2 @@ -778,7 +807,7 @@ def test_session_get_set_state_with_receiver(self, servicebus_namespace_connecti session.session.set_session_state("first_state") count = 0 for m in session: - assert m.properties.group_id == session_id.encode('utf-8') + assert m.session_id == session_id count += 1 session.session.get_session_state() assert count == 3 diff --git a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py index 980e813b890a..a58aff4422cd 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py @@ -168,6 +168,8 @@ def test_subscription_by_servicebus_client_receive_batch_with_deadletter(self, s for message in dl_receiver: message.complete() count += 1 - assert message.user_properties[b'DeadLetterReason'] == b'Testing reason' - assert message.user_properties[b'DeadLetterErrorDescription'] == b'Testing description' + assert message.dead_letter_reason == 'Testing reason' + assert message.dead_letter_error_description == 'Testing description' + assert message.properties[b'DeadLetterReason'] == b'Testing reason' + assert message.properties[b'DeadLetterErrorDescription'] == b'Testing description' assert count == 10 diff --git a/sdk/servicebus/azure-servicebus/tests/utilities.py b/sdk/servicebus/azure-servicebus/tests/utilities.py index b22b4c4249ca..2d46b00daa29 100644 --- a/sdk/servicebus/azure-servicebus/tests/utilities.py +++ b/sdk/servicebus/azure-servicebus/tests/utilities.py @@ -28,12 +28,10 @@ def print_message(_logger, message): _logger.info("Receiving: {}".format(message)) _logger.debug("Time to live: {}".format(message.time_to_live)) _logger.debug("Sequence number: {}".format(message.sequence_number)) - _logger.debug("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) - _logger.debug("Partition ID: {}".format(message.partition_id)) + _logger.debug("Enqueue Sequence numger: {}".format(message.enqueued_sequence_number)) _logger.debug("Partition Key: {}".format(message.partition_key)) - _logger.debug("User Properties: {}".format(message.user_properties)) - _logger.debug("Annotations: {}".format(message.annotations)) - _logger.debug("Delivery count: {}".format(message.header.delivery_count)) + _logger.debug("Properties: {}".format(message.properties)) + _logger.debug("Delivery count: {}".format(message.delivery_count)) try: _logger.debug("Locked until: {}".format(message.locked_until_utc)) _logger.debug("Lock Token: {}".format(message.lock_token))