Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ def download_blob(self, blob, offset=None, length=None, **kwargs):
# type: (Union[str, BlobProperties], Optional[int], Optional[int], **Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream.
a stream. Using chunks() returns an iterator which allows the user to iterate over the content in chunks.

:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
Expand Down
49 changes: 42 additions & 7 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import warnings
from io import BytesIO

from typing import Iterator
from azure.core.exceptions import HttpResponseError
from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
Expand Down Expand Up @@ -213,8 +214,9 @@ def _download_chunk(self, chunk_start, chunk_end):
class _ChunkIterator(object):
"""Async iterator for chunks in blob download stream."""

def __init__(self, size, content, downloader):
def __init__(self, size, content, downloader, chunk_size):
self.size = size
self._chunk_size = chunk_size
self._current_content = content
self._iter_downloader = downloader
self._iter_chunks = None
Expand All @@ -231,21 +233,39 @@ def __next__(self):
if self._complete:
raise StopIteration("Download complete")
if not self._iter_downloader:
# If no iterator was supplied, the download completed with
# the initial GET, so we just return that data
# cut the data obtained from initial GET into chunks
if len(self._current_content) > self._chunk_size:
return self._get_chunk_data()
self._complete = True
return self._current_content

if not self._iter_chunks:
self._iter_chunks = self._iter_downloader.get_chunk_offsets()
else:

# initial GET result still has more than _chunk_size bytes of data
if len(self._current_content) >= self._chunk_size:
return self._get_chunk_data()

try:
chunk = next(self._iter_chunks)
self._current_content = self._iter_downloader.yield_chunk(chunk)
self._current_content += self._iter_downloader.yield_chunk(chunk)
except StopIteration as e:
self._complete = True
if self._current_content:
return self._current_content
raise e

return self._current_content
# the current content from the first get is still there but smaller than chunk size
# therefore we want to make sure its also included
return self._get_chunk_data()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for calling self._get_chunk_data() in this case? as far as I understand if we hit this case (we're done iterating the first single get call and now iterating through our download chunks like we usually do) then current content should always be the same as the chunk size so we can just return self.current_content. It seems from your try-except that there is an edge case where there will be excess content - could you just put a small comment on that in the code - its not obvious to me haha

EDIT: I just realized the case is when the current content from the first get is still there but smaller than chunk size therefore we want to make sure its also included. I think adding a comment in the code would be very helpful here!
Good work thinking of all the cases


next = __next__ # Python 2 compatibility.

def _get_chunk_data(self):
chunk_data = self._current_content[: self._chunk_size]
self._current_content = self._current_content[self._chunk_size:]
return chunk_data


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.
Expand Down Expand Up @@ -426,6 +446,20 @@ def _initial_request(self):
return response

def chunks(self):
# type: () -> Iterator[bytes]
"""Iterate over chunks in the download stream.

:rtype: Iterator[bytes]

.. admonition:: Example:

.. literalinclude:: ../samples/blob_samples_hello_world.py
:start-after: [START download_a_blob_in_chunk]
:end-before: [END download_a_blob_in_chunk]
:language: python
:dedent: 12
:caption: Download a blob using chunks().
"""
if self.size == 0 or self._download_complete:
iter_downloader = None
else:
Expand All @@ -451,7 +485,8 @@ def chunks(self):
return _ChunkIterator(
size=self.size,
content=self._current_content,
downloader=iter_downloader)
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

def readall(self):
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def download_blob(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream. Using chunks() returns an iterator which allows the user to iterate over the content in chunks.
a stream. Using chunks() returns an async iterator which allows the user to iterate over the content in chunks.

:param int offset:
Start of byte range to use for downloading a section of the blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ async def download_blob(self, blob, offset=None, length=None, **kwargs):
# type: (Union[str, BlobProperties], Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream.
a stream. Using chunks() returns an async iterator which allows the user to iterate over the content in chunks.

:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from itertools import islice
import warnings

from typing import AsyncIterator
from azure.core.exceptions import HttpResponseError
from .._shared.encryption import decrypt_blob
from .._shared.request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -115,8 +116,9 @@ async def _download_chunk(self, chunk_start, chunk_end):
class _AsyncChunkIterator(object):
"""Async iterator for chunks in blob download stream."""

def __init__(self, size, content, downloader):
def __init__(self, size, content, downloader, chunk_size):
self.size = size
self._chunk_size = chunk_size
self._current_content = content
self._iter_downloader = downloader
self._iter_chunks = None
Expand All @@ -136,21 +138,35 @@ async def __anext__(self):
if self._complete:
raise StopAsyncIteration("Download complete")
if not self._iter_downloader:
# If no iterator was supplied, the download completed with
# the initial GET, so we just return that data
# cut the data obtained from initial GET into chunks
if len(self._current_content) > self._chunk_size:
return self._get_chunk_data()
self._complete = True
return self._current_content

if not self._iter_chunks:
self._iter_chunks = self._iter_downloader.get_chunk_offsets()
else:
try:
chunk = next(self._iter_chunks)
except StopIteration:
raise StopAsyncIteration("Download complete")
self._current_content = await self._iter_downloader.yield_chunk(chunk)

return self._current_content
# initial GET result still has more than _chunk_size bytes of data
if len(self._current_content) >= self._chunk_size:
return self._get_chunk_data()

try:
chunk = next(self._iter_chunks)
self._current_content += await self._iter_downloader.yield_chunk(chunk)
except StopIteration:
self._complete = True
# it's likely that there some data left in self._current_content
if self._current_content:
return self._current_content
raise StopAsyncIteration("Download complete")

return self._get_chunk_data()

def _get_chunk_data(self):
chunk_data = self._current_content[: self._chunk_size]
self._current_content = self._current_content[self._chunk_size:]
return chunk_data


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -325,9 +341,19 @@ async def _initial_request(self):
return response

def chunks(self):
# type: () -> AsyncIterator[bytes]
"""Iterate over chunks in the download stream.

:rtype: Iterable[bytes]
:rtype: AsyncIterator[bytes]

.. admonition:: Example:

.. literalinclude:: ../samples/blob_samples_hello_world_async.py
:start-after: [START download_a_blob_in_chunk]
:end-before: [END download_a_blob_in_chunk]
:language: python
:dedent: 16
:caption: Download a blob using chunks().
"""
if self.size == 0 or self._download_complete:
iter_downloader = None
Expand All @@ -353,7 +379,8 @@ def chunks(self):
return _AsyncChunkIterator(
size=self.size,
content=self._current_content,
downloader=iter_downloader)
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

async def readall(self):
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def stream_block_blob(self):
source_blob_client.upload_blob(data, blob_type="BlockBlob")

destination_blob_client = container_client.get_blob_client("destination_blob")

# [START download_a_blob_in_chunk]
# This returns a StorageStreamDownloader.
stream = source_blob_client.download_blob()
block_list = []
Expand All @@ -122,6 +122,8 @@ def stream_block_blob(self):
destination_blob_client.stage_block(block_id=block_id, data=chunk)
block_list.append(BlobBlock(block_id=block_id))

# [END download_a_blob_in_chunk]

# Upload the whole chunk to azure storage and make up one blob
destination_blob_client.commit_block_list(block_list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async def stream_block_blob(self):

destination_blob_client = container_client.get_blob_client("destination_blob")

# [START download_a_blob_in_chunk]
# This returns a StorageStreamDownloader.
stream = await source_blob_client.download_blob()
block_list = []
Expand All @@ -126,6 +127,7 @@ async def stream_block_blob(self):
block_id = str(uuid.uuid4())
await destination_blob_client.stage_block(block_id=block_id, data=chunk)
block_list.append(BlobBlock(block_id=block_id))
# [END download_a_blob_in_chunk]

# Upload the whole chunk to azure storage and make up one blob
await destination_blob_client.commit_block_list(block_list)
Expand Down
Loading