diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 8eedf164668c..473f94715dcc 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -6,6 +6,9 @@ * Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic. * Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic. +* Added support for scheduling messages and scheduled message cancellation. + - Use `ServiceBusSender.schedule(messages, schedule_time_utc)` for scheduling messages. + - Use `ServiceBusSender.cancel_scheduled_messages(sequence_numbers)` for scheduled messages cancellation. * `ServiceBusSender.send()` can now send a list of messages in one call, if they fit into a single batch. If they do not fit a `ValueError` is thrown. * `BatchMessage.add()` and `ServiceBusSender.send()` raises `MessageContentTooLarge`, which is a subclass of `ValueError` if the content is over-sized. * `ServiceBusReceiver.receive()` raises `ValueError` if the max_batch_size is greater than the prefetch of `ServiceBusClient`. @@ -21,6 +24,7 @@ * Session receivers are now created via their own top level functions, e.g. `get_queue_sesison_receiver` and `get_subscription_session_receiver`. Non session receivers no longer take session_id as a paramter. * `ServiceBusSender.send()` no longer takes a timeout parameter, as it should be redundant with retry options provided when creating the client. * Exception imports have been removed from module `azure.servicebus`. Import from `azure.servicebus.exceptions` instead. +* `ServiceBusSender.schedule()` has swapped the ordering of parameters `schedule_time_utc` and `messages` for better consistency with `send()` syntax. ## 7.0.0b1 (2020-04-06) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 904b0b14d7fa..8d1a0daa5c57 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -240,27 +240,40 @@ def time_to_live(self, value): self.header.time_to_live = int(value) * 1000 @property - def body(self): - # type: () -> Union[bytes, Generator[bytes]] - """The body of the Message. + def scheduled_enqueue_time_utc(self): + # type: () -> Optional[datetime.datetime] + """Get or set the utc scheduled enqueue time to the message. + This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. + If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, + which returns sequence numbers that can be used for future cancellation. + `scheduled_enqueue_time_utc` is None if not set. - :rtype: bytes or generator[bytes] + :rtype: ~datetime.datetime """ - return self.message.get_data() + if self.message.annotations: + timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) + if timestamp: + in_seconds = timestamp/1000.0 + return utc_from_timestamp(in_seconds) + return None - def schedule(self, schedule_time_utc): + @scheduled_enqueue_time_utc.setter + def scheduled_enqueue_time_utc(self, value): # type: (datetime.datetime) -> None - """Add a specific utc enqueue time to the message. - - :param schedule_time_utc: The scheduled utc time to enqueue the message. - :type schedule_time_utc: ~datetime.datetime - :rtype: None - """ if not self.properties.message_id: self.properties.message_id = str(uuid.uuid4()) if not self.message.annotations: self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = schedule_time_utc + self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = value + + @property + def body(self): + # type: () -> Union[bytes, Generator[bytes]] + """The body of the Message. + + :rtype: bytes or generator[bytes] + """ + return self.message.get_data() class BatchMessage(object): @@ -405,20 +418,6 @@ def enqueued_time_utc(self): return utc_from_timestamp(in_seconds) return None - @property - def scheduled_enqueue_time_utc(self): - # type: () -> Optional[datetime.datetime] - """ - - :rtype: ~datetime.datetime - """ - if self.message.annotations: - timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) - if timestamp: - in_seconds = timestamp/1000.0 - return utc_from_timestamp(in_seconds) - return None - @property def sequence_number(self): # type: () -> Optional[int] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index f5ca62e2f0d4..3d5dfed1f926 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -64,7 +64,10 @@ def _set_msg_timeout(self, timeout=None, last_exception=None): def _build_schedule_request(cls, schedule_time_utc, *messages): request_body = {MGMT_REQUEST_MESSAGES: []} for message in messages: - message.schedule(schedule_time_utc) + if not isinstance(message, Message): + raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." + " Received instead: {}".format(message.__class__.__name__)) + message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.properties.message_id if message.properties.group_id: @@ -184,15 +187,15 @@ def _send(self, message, timeout=None, last_exception=None): self._set_msg_timeout(timeout, last_exception) self._handler.send_message(message.message) - def _schedule(self, message, schedule_time_utc): - # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] - """Send Message or BatchMessage to be enqueued at a specific time. + def schedule(self, messages, schedule_time_utc): + # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] + """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. - :param message: The messages to schedule. - :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage + :param messages: The message or list of messages to schedule. + :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime - :rtype: List[int] + :rtype: list[int] .. admonition:: Example: @@ -205,17 +208,17 @@ def _schedule(self, message, schedule_time_utc): """ # pylint: disable=protected-access self._open() - if isinstance(message, BatchMessage): - request_body = self._build_schedule_request(schedule_time_utc, *message._messages) + if isinstance(messages, Message): + request_body = self._build_schedule_request(schedule_time_utc, messages) else: - request_body = self._build_schedule_request(schedule_time_utc, message) + request_body = self._build_schedule_request(schedule_time_utc, *messages) return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, mgmt_handlers.schedule_op ) - def _cancel_scheduled_messages(self, sequence_numbers): + def cancel_scheduled_messages(self, sequence_numbers): # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. @@ -223,6 +226,8 @@ def _cancel_scheduled_messages(self, sequence_numbers): :param sequence_numbers: The sequence numbers of the scheduled messages. :type sequence_numbers: int or list[int] :rtype: None + :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already + cancelled or enqueued. .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 3b51a77eb4ae..32e65996da7f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -133,15 +133,15 @@ async def _send(self, message, timeout=None, last_exception=None): self._set_msg_timeout(timeout, last_exception) await self._handler.send_message_async(message.message) - async def _schedule(self, message, schedule_time_utc): - # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] - """Send Message or BatchMessage to be enqueued at a specific time. + async def schedule(self, messages, schedule_time_utc): + # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] + """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. - :param message: The messages to schedule. - :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage + :param messages: The message or list of messages to schedule. + :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime - :rtype: List[int] + :rtype: list[int] .. admonition:: Example: @@ -154,24 +154,26 @@ async def _schedule(self, message, schedule_time_utc): """ # pylint: disable=protected-access await self._open() - if isinstance(message, BatchMessage): - request_body = self._build_schedule_request(schedule_time_utc, *message._messages) + if isinstance(messages, Message): + request_body = self._build_schedule_request(schedule_time_utc, messages) else: - request_body = self._build_schedule_request(schedule_time_utc, message) + request_body = self._build_schedule_request(schedule_time_utc, *messages) return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, mgmt_handlers.schedule_op ) - async def _cancel_scheduled_messages(self, sequence_numbers): + async def cancel_scheduled_messages(self, sequence_numbers): # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. - :param sequence_numbers: he sequence numbers of the scheduled messages. + :param sequence_numbers: The sequence numbers of the scheduled messages. :type sequence_numbers: int or list[int] :rtype: None + :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already + cancelled or enqueued. .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/migration_guide.md b/sdk/servicebus/azure-servicebus/migration_guide.md index 7d74fbb615e2..df54ffe743cb 100644 --- a/sdk/servicebus/azure-servicebus/migration_guide.md +++ b/sdk/servicebus/azure-servicebus/migration_guide.md @@ -49,6 +49,13 @@ semantics with the sender or receiver lifetime. | `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) | | `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) | +### Scheduling messages and cancelling scheduled messages + +| In v0.50 | Equivalent in v7 | Sample | +|---|---|---| +| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) | +| `queue_client.get_sender().cancel_scheduled_messages(sequence_number1, sequence_number2)`| `sb_client.get_queue_sender().cancel_scheduled_messages([sequence_number1, sequence_number2])` | [Cancel scheduled messages](./samples/sync_samples/schedule_messages_and_cancellation.py)| + ### Working with sessions | In v0.50 | Equivalent in v7 | Sample | diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index 10546948e2a7..fc2453a9e592 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -33,6 +33,9 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [session_send_receive.py](./sync_samples/session_send_receive.py) ([async_version](./async_samples/session_send_receive_async.py)) - Examples to send messages to and receive messages from a session-enabled service bus queue: - Send messages to a session-enabled queue - Receive messages from session-enabled queue +- [schedule_messages_and_cancellation](./sync_samples/schedule_messages_and_cancellation.py) ([async_version](./async_samples/schedule_messages_and_cancellation_async.py)) - Examples to schedule messages and cancel scheduled message: + - Schedule a single message or multiples messages to a queue + - Cancel scheduled messages from a queue - [client_identity_authentication.py](./sync_samples/client_identity_authentication.py) ([async_version](./async_samples/client_identity_authentication_async.py)) - Examples to authenticate the client by Azure Activate Directory - Authenticate and create the client utilizing the `azure.identity` library - [proxy.py](./sync_samples/proxy.py) ([async_version](./async_samples/proxy_async.py)) - Examples to send message behind a proxy: diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 90657b76e374..105c9920dac3 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -12,6 +12,7 @@ - Receive and settle deferred messages """ import os +import datetime import asyncio from azure.servicebus.aio import ServiceBusClient from azure.servicebus import Message @@ -96,7 +97,7 @@ async def example_create_servicebus_sender_async(): topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: - queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name) # [END create_topic_sender_from_sb_client_async] return queue_sender @@ -271,8 +272,24 @@ async def example_session_ops_async(): break +async def example_schedule_ops_async(): + servicebus_sender = await example_create_servicebus_sender_async() + # [START scheduling_messages_async] + async with servicebus_sender: + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + scheduled_messages = [Message("Scheduled message") for _ in range(10)] + sequence_nums = await servicebus_sender.schedule(scheduled_messages, scheduled_time_utc) + # [END scheduling_messages_async] + + # [START cancel_scheduled_messages_async] + async with servicebus_sender: + await servicebus_sender.cancel_scheduled_messages(sequence_nums) + # [END cancel_scheduled_messages_async] + + if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(example_send_and_receive_async()) loop.run_until_complete(example_receive_deferred_async()) + loop.run_until_complete(example_schedule_ops_async()) # loop.run_until_complete(example_session_ops_async()) diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py new file mode 100644 index 000000000000..05458e1169dc --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show scheduling messages to and cancelling messages from a Service Bus Queue asynchronously. +""" + +# pylint: disable=C0111 + +import os +import asyncio +import datetime +from azure.servicebus.aio import ServiceBusClient +from azure.servicebus import Message + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + + +async def schedule_single_message(sender): + message = Message("Message to be scheduled") + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_number = await sender.schedule(message, scheduled_time_utc) + return sequence_number + + +async def schedule_multiple_messages(sender): + messages_to_schedule = [] + for _ in range(10): + messages_to_schedule.append(Message("Message to be scheduled")) + + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_numbers = await sender.schedule(messages_to_schedule, scheduled_time_utc) + return sequence_numbers + + +async def main(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) + async with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + async with sender: + sequence_number = await schedule_single_message(sender) + print("Single message is scheduled and sequence number is {}".format(sequence_number)) + sequence_numbers = await schedule_multiple_messages(sender) + print("Multiple messages are scheduled and sequence numbers are {}".format(sequence_numbers)) + + await sender.cancel_scheduled_messages(sequence_number) + await sender.cancel_scheduled_messages(sequence_numbers) + print("All scheduled messages are cancelled.") + +loop = asyncio.get_event_loop() +loop.run_until_complete(main()) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 55d689e4bf5a..9155886e31b0 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -93,7 +93,7 @@ def example_create_servicebus_sender_sync(): topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: - queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name) # [END create_topic_sender_from_sb_client_sync] return queue_sender @@ -295,6 +295,22 @@ def example_session_ops_sync(): break +def example_schedule_ops_sync(): + servicebus_sender = example_create_servicebus_sender_sync() + # [START scheduling_messages] + with servicebus_sender: + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + scheduled_messages = [Message("Scheduled message") for _ in range(10)] + sequence_nums = servicebus_sender.schedule(scheduled_messages, scheduled_time_utc) + # [END scheduling_messages] + + # [START cancel_scheduled_messages] + with servicebus_sender: + servicebus_sender.cancel_scheduled_messages(sequence_nums) + # [END cancel_scheduled_messages] + + example_send_and_receive_sync() example_receive_deferred_sync() +example_schedule_ops_sync() # example_session_ops_sync() diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py new file mode 100644 index 000000000000..8d43f3c42deb --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show scheduling messages to and cancelling messages from a Service Bus Queue. +""" + +# pylint: disable=C0111 + +import os +import datetime +from azure.servicebus import ServiceBusClient, Message + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + + +def schedule_single_message(sender): + message = Message("Message to be scheduled") + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_number = sender.schedule(message, scheduled_time_utc) + return sequence_number + + +def schedule_multiple_messages(sender): + messages_to_schedule = [] + for _ in range(10): + messages_to_schedule.append(Message("Message to be scheduled")) + + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_numbers = sender.schedule(messages_to_schedule, scheduled_time_utc) + return sequence_numbers + + +servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) +with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + with sender: + sequence_number = schedule_single_message(sender) + print("Single message is scheduled and sequence number is {}".format(sequence_number)) + sequence_numbers = schedule_multiple_messages(sender) + print("Multiple messages are scheduled and sequence numbers are {}".format(sequence_numbers)) + + sender.cancel_scheduled_messages(sequence_number) + sender.cancel_scheduled_messages(sequence_numbers) + print("All scheduled messages are cancelled.") diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index d9644d2e765e..13b7e3d453e3 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -957,7 +957,6 @@ async def test_async_queue_message_batch(self, servicebus_namespace_connection_s print_message(_logger, m) await m.complete() - @pytest.mark.skip(reason="requires scheduler") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -974,7 +973,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio message_id = uuid.uuid4() message = Message(content) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time await sender.send(message) messages = await receiver.receive(max_wait_time=120) @@ -992,7 +991,6 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio else: raise Exception("Failed to receive scheduled message.") - @pytest.mark.skip(reason="requires scheduler") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1012,7 +1010,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace message_id_b = uuid.uuid4() message_b = Message(content) message_b.properties.message_id = message_id_b - tokens = await sender.schedule(enqueue_time, message_a, message_b) + tokens = await sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 recv = await receiver.receive(max_wait_time=120) @@ -1048,10 +1046,10 @@ async def test_async_queue_cancel_scheduled_messages(self, servicebus_namespace_ async with sb_client.get_queue_sender(servicebus_queue.name) as sender: message_a = Message("Test scheduled message") message_b = Message("Test scheduled message") - tokens = await sender.schedule(enqueue_time, message_a, message_b) + tokens = await sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - await sender.cancel_scheduled_messages(*tokens) + await sender.cancel_scheduled_messages(tokens) messages = await receiver.receive(max_wait_time=120) assert len(messages) == 0 diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 3b490e4ff8e7..9427547b1d30 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -596,7 +596,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect message_id = uuid.uuid4() message = Message(content, session_id=session_id) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time await sender.send(message) messages = [] diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 1477f450f3a2..bf6dc50dce63 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1084,7 +1084,7 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se message_id = uuid.uuid4() message = Message(content) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time sender.send(message) messages = receiver.receive(max_wait_time=120) @@ -1103,7 +1103,6 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se raise Exception("Failed to receive schdeduled message.") - @pytest.mark.skip("Pending message scheduling functionality") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1125,11 +1124,11 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ message_id_b = uuid.uuid4() message_b = Message(content) message_b.properties.message_id = message_id_b - tokens = sender.schedule(enqueue_time, message_a, message_b) + tokens = sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - messages = receiver.fetch_next(timeout=120) - messages.extend(receiver.fetch_next(timeout=5)) + messages = receiver.receive(max_wait_time=120) + messages.extend(receiver.receive(max_wait_time=5)) if messages: try: data = str(messages[0]) @@ -1145,7 +1144,6 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ raise Exception("Failed to receive schdeduled message.") - @pytest.mark.skip(reason="Pending message scheduling functionality") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1161,10 +1159,10 @@ def test_queue_cancel_scheduled_messages(self, servicebus_namespace_connection_s with sb_client.get_queue_sender(servicebus_queue.name) as sender: message_a = Message("Test scheduled message") message_b = Message("Test scheduled message") - tokens = sender.schedule(enqueue_time, message_a, message_b) + tokens = sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - sender.cancel_scheduled_messages(*tokens) + sender.cancel_scheduled_messages(tokens) messages = receiver.receive(max_wait_time=120) try: