Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/storage/azure-storage-queue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

### Features Added
- Added support for `create_queue_if_not_exists()` for `QueueClient`
- Added support for `max_messages` in `receive_messages()` to specify the maximum number of messages to receive from the queue.

### Other Changes
- Updated documentation for `receive_messages()` to explain iterator behavior and life-cycle.
- Added a sample to `queue_samples_message.py` (and async-equivalent) showcasing the use of `max_messages` in `receive_messages()`.

## 12.2.0 (2022-03-08)

Expand Down
13 changes: 12 additions & 1 deletion sdk/storage/azure-storage-queue/azure/storage/queue/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ class MessagesPaged(PageIterator):
:param callable command: Function to retrieve the next page of items.
:param int results_per_page: The maximum number of messages to retrieve per
call.
:param int max_messages: The maximum number of messages to retrieve from
the queue.
"""
def __init__(self, command, results_per_page=None, continuation_token=None):
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

Expand All @@ -275,9 +277,16 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
)
self._command = command
self.results_per_page = results_per_page
self._max_messages = max_messages

def _get_next_cb(self, continuation_token):
try:
if self._max_messages is not None:
if self.results_per_page is None:
self.results_per_page = 1
if self._max_messages < 1:
raise StopIteration("End of paging")
self.results_per_page = min(self.results_per_page, self._max_messages)
return self._command(number_of_messages=self.results_per_page)
except HttpResponseError as error:
process_storage_error(error)
Expand All @@ -286,6 +295,8 @@ def _extract_data_cb(self, messages): # pylint: disable=no-self-use
# There is no concept of continuation token, so raising on my own condition
if not messages:
raise StopIteration("End of paging")
if self._max_messages is not None:
self._max_messages = self._max_messages - len(messages)
return "TOKEN_IGNORED", [QueueMessage._from_generated(q) for q in messages] # pylint: disable=protected-access


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ def receive_messages(self, **kwargs):
content and a pop_receipt value, which is required to delete the message.
The message is not automatically deleted from the queue, but after it has
been retrieved, it is not visible to other clients for the time interval
specified by the visibility_timeout parameter.
specified by the visibility_timeout parameter. The iterator will continuously
fetch messages until the queue is empty or max_messages is reached (if max_messages
is set).

If the key-encryption-key or resolver field is set on the local service object, the messages will be
decrypted before being returned.
Expand Down Expand Up @@ -618,6 +620,8 @@ def receive_messages(self, **kwargs):
should be set to a value smaller than the time-to-live value.
:keyword int timeout:
The server timeout, expressed in seconds.
:keyword int max_messages:
An integer that specifies the maximum number of messages to retrieve from the queue.
:return:
Returns a message iterator of dict-like Message objects.
:rtype: ~azure.core.paging.ItemPaged[~azure.storage.queue.QueueMessage]
Expand All @@ -634,6 +638,7 @@ def receive_messages(self, **kwargs):
messages_per_page = kwargs.pop('messages_per_page', None)
visibility_timeout = kwargs.pop('visibility_timeout', None)
timeout = kwargs.pop('timeout', None)
max_messages = kwargs.pop('max_messages', None)
self._config.message_decode_policy.configure(
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
Expand All @@ -646,7 +651,11 @@ def receive_messages(self, **kwargs):
cls=self._config.message_decode_policy,
**kwargs
)
return ItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
if max_messages is not None and messages_per_page is not None:
if max_messages < messages_per_page:
raise ValueError("max_messages must be greater or equal to messages_per_page")
return ItemPaged(command, results_per_page=messages_per_page,
page_iterator_class=MessagesPaged, max_messages=max_messages)
except HttpResponseError as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ class MessagesPaged(AsyncPageIterator):
:param callable command: Function to retrieve the next page of items.
:param int results_per_page: The maximum number of messages to retrieve per
call.
:param int max_messages: The maximum number of messages to retrieve from
the queue.
"""
def __init__(self, command, results_per_page=None, continuation_token=None):
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

Expand All @@ -32,9 +34,16 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
)
self._command = command
self.results_per_page = results_per_page
self._max_messages = max_messages

async def _get_next_cb(self, continuation_token):
try:
if self._max_messages is not None:
if self.results_per_page is None:
self.results_per_page = 1
if self._max_messages < 1:
raise StopAsyncIteration("End of paging")
self.results_per_page = min(self.results_per_page, self._max_messages)
return await self._command(number_of_messages=self.results_per_page)
except HttpResponseError as error:
process_storage_error(error)
Expand All @@ -43,6 +52,8 @@ async def _extract_data_cb(self, messages):
# There is no concept of continuation token, so raising on my own condition
if not messages:
raise StopAsyncIteration("End of paging")
if self._max_messages is not None:
self._max_messages = self._max_messages - len(messages)
return "TOKEN_IGNORED", [QueueMessage._from_generated(q) for q in messages] # pylint: disable=protected-access


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ def receive_messages(self, **kwargs):
content and a pop_receipt value, which is required to delete the message.
The message is not automatically deleted from the queue, but after it has
been retrieved, it is not visible to other clients for the time interval
specified by the visibility_timeout parameter.
specified by the visibility_timeout parameter. The iterator will continuously
fetch messages until the queue is empty or max_messages is reached (if max_messages
is set).

If the key-encryption-key or resolver field is set on the local service object, the messages will be
decrypted before being returned.
Expand All @@ -511,6 +513,8 @@ def receive_messages(self, **kwargs):
should be set to a value smaller than the time-to-live value.
:keyword int timeout:
The server timeout, expressed in seconds.
:keyword int max_messages:
An integer that specifies the maximum number of messages to retrieve from the queue.
:return:
Returns a message iterator of dict-like Message objects.
:rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.storage.queue.QueueMessage]
Expand All @@ -527,6 +531,7 @@ def receive_messages(self, **kwargs):
messages_per_page = kwargs.pop('messages_per_page', None)
visibility_timeout = kwargs.pop('visibility_timeout', None)
timeout = kwargs.pop('timeout', None)
max_messages = kwargs.pop('max_messages', None)
self._config.message_decode_policy.configure(
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
Expand All @@ -540,7 +545,11 @@ def receive_messages(self, **kwargs):
cls=self._config.message_decode_policy,
**kwargs
)
return AsyncItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
if max_messages is not None and messages_per_page is not None:
if max_messages < messages_per_page:
raise ValueError("max_messages must be greater or equal to messages_per_page")
return AsyncItemPaged(command, results_per_page=messages_per_page,
page_iterator_class=MessagesPaged, max_messages=max_messages)
except HttpResponseError as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,43 @@ def update_message(self):
# Delete the queue
queue.delete_queue()

def receive_messages_with_max_messages(self):
# Instantiate a queue client
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue9")

# Create the queue
queue.create_queue()

try:
queue.send_message(u"message1")
queue.send_message(u"message2")
queue.send_message(u"message3")
queue.send_message(u"message4")
queue.send_message(u"message5")
queue.send_message(u"message6")
queue.send_message(u"message7")
queue.send_message(u"message8")
queue.send_message(u"message9")
queue.send_message(u"message10")

# Receive messages one-by-one
messages = queue.receive_messages(max_messages=5)
for msg in messages:
print(msg.content)
queue.delete_message(msg)

# Only prints 5 messages because 'max_messages'=5
# >>message1
# >>message2
# >>message3
# >>message4
# >>message5

finally:
# Delete the queue
queue.delete_queue()


if __name__ == '__main__':
sample = QueueMessageSamples()
Expand All @@ -311,3 +348,4 @@ def update_message(self):
sample.delete_and_clear_messages()
sample.peek_messages()
sample.update_message()
sample.receive_messages_with_max_messages()
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,44 @@ async def update_message_async(self):
finally:
# Delete the queue
await queue.delete_queue()

async def receive_messages_with_max_messages(self):
# Instantiate a queue client
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue7")

# Create the queue
async with queue:
await queue.create_queue()

try:
await queue.send_message(u"message1")
await queue.send_message(u"message2")
await queue.send_message(u"message3")
await queue.send_message(u"message4")
await queue.send_message(u"message5")
await queue.send_message(u"message6")
await queue.send_message(u"message7")
await queue.send_message(u"message8")
await queue.send_message(u"message9")
await queue.send_message(u"message10")

# Receive messages one-by-one
messages = queue.receive_messages(max_messages=5)
async for msg in messages:
print(msg.content)
await queue.delete_message(msg)

# Only prints 5 messages because 'max_messages'=5
# >>message1
# >>message2
# >>message3
# >>message4
# >>message5

finally:
# Delete the queue
await queue.delete_queue()


async def main():
Expand All @@ -290,7 +328,8 @@ async def main():
await sample.delete_and_clear_messages_async()
await sample.peek_messages_async()
await sample.update_message_async()
await sample.receive_messages_with_max_messages()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(main())
Loading