Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/storage/azure-storage-queue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

### Features Added
- Added support for `create_if_not_exists()` for `QueueClient`
- Added support for `max_messages` in `receive_messages()` to specify the maximum number of messages to receive from the queue.

### Other Changes
- Updated documentation for `receive_messages()` to explain iterator behavior and life-cycle.
- Added a sample to `queue_samples_message.py` (and async-equivalent) showcasing the use of `max_messages` in `receieve_messages()`.

## 12.2.0 (2022-03-08)

Expand Down Expand Up @@ -189,4 +194,4 @@ https://aka.ms/azure-sdk-preview1-python.
- Error message now contains the ErrorCode from the x-ms-error-code header value.

## 1.0.0
- The package has switched from Apache 2.0 to the MIT license.
- The package has switched from Apache 2.0 to the MIT license.
15 changes: 14 additions & 1 deletion sdk/storage/azure-storage-queue/azure/storage/queue/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ class MessagesPaged(PageIterator):
:param callable command: Function to retrieve the next page of items.
:param int results_per_page: The maximum number of messages to retrieve per
call.
:param int max_messages: The maximum number of messages to retrieve from
the queue.
"""
def __init__(self, command, results_per_page=None, continuation_token=None):
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

Expand All @@ -275,9 +277,20 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
)
self._command = command
self.results_per_page = results_per_page
self.max_messages = max_messages

def _get_next_cb(self, continuation_token):
try:
if self.max_messages is not None:
if self.results_per_page is None:
self.results_per_page = 1
if self.max_messages == 0:
raise StopIteration("End of paging")
if self.results_per_page > self.max_messages:
self.results_per_page = self.max_messages
self.max_messages = 0
else:
self.max_messages = self.max_messages - self.results_per_page
return self._command(number_of_messages=self.results_per_page)
except HttpResponseError as error:
process_storage_error(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import six

from azure.core.exceptions import HttpResponseError, ResourceExistsError
from azure.core.exceptions import HttpResponseError
from azure.core.paging import ItemPaged
from azure.core.tracing.decorator import distributed_trace
from ._serialize import get_api_version
Expand Down Expand Up @@ -242,28 +242,6 @@ def create_queue(self, **kwargs):
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace
def create_if_not_exists(self, **kwargs):
# type: (Any) -> None
"""Creates a new queue in the storage account.

If a queue with the same name already exists, it is not changed.

:keyword Dict(str,str) metadata:
A dict containing name-value pairs to associate with the queue as
metadata. Note that metadata names preserve the case with which they
were created, but are case-insensitive when set or read.
:keyword int timeout:
The server timeout, expressed in seconds.
:return: None or the result of cls(response)
:rtype: None
:raises: StorageErrorException
"""
try:
return self.create_queue(**kwargs)
except ResourceExistsError:
return None

@distributed_trace
def delete_queue(self, **kwargs):
# type: (Any) -> None
Expand Down Expand Up @@ -588,7 +566,9 @@ def receive_messages(self, **kwargs):
content and a pop_receipt value, which is required to delete the message.
The message is not automatically deleted from the queue, but after it has
been retrieved, it is not visible to other clients for the time interval
specified by the visibility_timeout parameter.
specified by the visibility_timeout parameter. The iterator will continuously
fetch messages until the queue is empty or max_messages is reached (if max_messages
is set).

If the key-encryption-key or resolver field is set on the local service object, the messages will be
decrypted before being returned.
Expand Down Expand Up @@ -618,6 +598,8 @@ def receive_messages(self, **kwargs):
should be set to a value smaller than the time-to-live value.
:keyword int timeout:
The server timeout, expressed in seconds.
:keyword int max_messages:
An integer that specifies the maximum number of messages to retrieve from the queue.
:return:
Returns a message iterator of dict-like Message objects.
:rtype: ~azure.core.paging.ItemPaged[~azure.storage.queue.QueueMessage]
Expand All @@ -634,6 +616,7 @@ def receive_messages(self, **kwargs):
messages_per_page = kwargs.pop('messages_per_page', None)
visibility_timeout = kwargs.pop('visibility_timeout', None)
timeout = kwargs.pop('timeout', None)
max_messages = kwargs.pop('max_messages', None)
self._config.message_decode_policy.configure(
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
Expand All @@ -646,7 +629,8 @@ def receive_messages(self, **kwargs):
cls=self._config.message_decode_policy,
**kwargs
)
return ItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
return ItemPaged(command, results_per_page=messages_per_page,
page_iterator_class=MessagesPaged, max_messages=max_messages)
except HttpResponseError as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ class MessagesPaged(AsyncPageIterator):
:param callable command: Function to retrieve the next page of items.
:param int results_per_page: The maximum number of messages to retrieve per
call.
:param int max_messages: The maximum number of messages to retrieve from
the queue.
"""
def __init__(self, command, results_per_page=None, continuation_token=None):
def __init__(self, command, results_per_page=None, continuation_token=None, max_messages=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

Expand All @@ -32,9 +34,20 @@ def __init__(self, command, results_per_page=None, continuation_token=None):
)
self._command = command
self.results_per_page = results_per_page
self.max_messages = max_messages

async def _get_next_cb(self, continuation_token):
try:
if self.max_messages is not None:
if self.results_per_page is None:
self.results_per_page = 1
if self.max_messages == 0:
raise StopAsyncIteration("End of paging")
if self.results_per_page > self.max_messages:
self.results_per_page = self.max_messages
self.max_messages = 0
else:
self.max_messages = self.max_messages - self.results_per_page
return await self._command(number_of_messages=self.results_per_page)
except HttpResponseError as error:
process_storage_error(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urlparse import urlparse # type: ignore
from urllib2 import quote, unquote # type: ignore

from azure.core.exceptions import HttpResponseError, ResourceExistsError
from azure.core.exceptions import HttpResponseError
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async

Expand Down Expand Up @@ -154,28 +154,6 @@ async def create_queue(self, **kwargs):
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace_async
async def create_if_not_exists(self, **kwargs):
# type: (Any) -> None
"""Creates a new queue in the storage account.

If a queue with the same name already exists, it is not changed.

:keyword Dict(str,str) metadata:
A dict containing name-value pairs to associate with the queue as
metadata. Note that metadata names preserve the case with which they
were created, but are case-insensitive when set or read.
:keyword int timeout:
The server timeout, expressed in seconds.
:return: None or the result of cls(response)
:rtype: None
:raises: StorageErrorException
"""
try:
return await self.create_queue(**kwargs)
except ResourceExistsError:
return None

@distributed_trace_async
async def delete_queue(self, **kwargs):
# type: (Optional[Any]) -> None
Expand Down Expand Up @@ -486,11 +464,13 @@ def receive_messages(self, **kwargs):
# type: (Optional[Any]) -> AsyncItemPaged[QueueMessage]
"""Removes one or more messages from the front of the queue.

When a message is retrieved from the queue, the response includes the message
When a message is retrieved from the queue, the response includes the message
content and a pop_receipt value, which is required to delete the message.
The message is not automatically deleted from the queue, but after it has
been retrieved, it is not visible to other clients for the time interval
specified by the visibility_timeout parameter.
specified by the visibility_timeout parameter. The iterator will continuously
fetch messages until the queue is empty or max_messages is reached (if max_messages
is set).

If the key-encryption-key or resolver field is set on the local service object, the messages will be
decrypted before being returned.
Expand All @@ -511,6 +491,8 @@ def receive_messages(self, **kwargs):
should be set to a value smaller than the time-to-live value.
:keyword int timeout:
The server timeout, expressed in seconds.
:keyword int max_messages:
An integer that specifies the maximum number of messages to retrieve from the queue.
:return:
Returns a message iterator of dict-like Message objects.
:rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.storage.queue.QueueMessage]
Expand All @@ -527,6 +509,7 @@ def receive_messages(self, **kwargs):
messages_per_page = kwargs.pop('messages_per_page', None)
visibility_timeout = kwargs.pop('visibility_timeout', None)
timeout = kwargs.pop('timeout', None)
max_messages = kwargs.pop('max_messages', None)
self._config.message_decode_policy.configure(
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
Expand All @@ -540,7 +523,8 @@ def receive_messages(self, **kwargs):
cls=self._config.message_decode_policy,
**kwargs
)
return AsyncItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
return AsyncItemPaged(command, results_per_page=messages_per_page,
page_iterator_class=MessagesPaged, max_messages=max_messages)
except HttpResponseError as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,47 @@ def update_message(self):
# Delete the queue
queue.delete_queue()

def receive_messages_with_max_messages(self):
# Instantiate a queue client
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue9")

# Create the queue
queue.create_queue()

try:
# [START send_messages]
queue.send_message(u"message1")
queue.send_message(u"message2")
queue.send_message(u"message3")
queue.send_message(u"message4")
queue.send_message(u"message5")
queue.send_message(u"message6")
queue.send_message(u"message7")
queue.send_message(u"message8")
queue.send_message(u"message9")
queue.send_message(u"message10")
# [END send_messages]

# [START receive_messages]
# Receive messages one-by-one
messages = queue.receive_messages(max_messages=5)
for msg in messages:
print(msg.content)
queue.delete_message(msg)
# [END receive_messages]

# Only prints 5 messages because 'max_messages'=5
# >>message1
# >>message2
# >>message3
# >>message4
# >>message5

finally:
# Delete the queue
queue.delete_queue()


if __name__ == '__main__':
sample = QueueMessageSamples()
Expand All @@ -311,3 +352,4 @@ def update_message(self):
sample.delete_and_clear_messages()
sample.peek_messages()
sample.update_message()
sample.receive_messages_with_max_messages()
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,49 @@ async def update_message_async(self):
finally:
# Delete the queue
await queue.delete_queue()

async def receive_messages_with_max_messages(self):
# Instantiate a queue client
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue7")

# Create the queue
async with queue:
await queue.create_queue()

try:
# [START send_messages]
await queue.send_message(u"message1")
await queue.send_message(u"message2")
await queue.send_message(u"message3")
await queue.send_message(u"message4")
await queue.send_message(u"message5")
await queue.send_message(u"message6")
await queue.send_message(u"message7")
await queue.send_message(u"message8")
await queue.send_message(u"message9")
await queue.send_message(u"message10")

# [END send_messages]

# [START receive_messages]
# Receive messages one-by-one
messages = queue.receive_messages(max_messages=5)
async for msg in messages:
print(msg.content)
await queue.delete_message(msg)
# [END receive_messages]

# Only prints 5 messages because 'max_messages'=5
# >>message1
# >>message2
# >>message3
# >>message4
# >>message5

finally:
# Delete the queue
await queue.delete_queue()


async def main():
Expand All @@ -290,7 +333,8 @@ async def main():
await sample.delete_and_clear_messages_async()
await sample.peek_messages_async()
await sample.update_message_async()
await sample.receive_messages_with_max_messages()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(main())
Loading