diff --git a/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client.py b/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client.py index 166d7e81a2ae..104d323d93df 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client.py @@ -20,6 +20,7 @@ from azure.core import Configuration from azure.core.pipeline import Pipeline from azure.core.pipeline.transport import RequestsTransport +from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy from azure.core.pipeline.policies import ( RedirectPolicy, ContentDecodePolicy, @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs): config.retry_policy, config.logging_policy, StorageResponseHook(**kwargs), + DistributedTracingPolicy(), ] return config, Pipeline(config.transport, policies=policies) diff --git a/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client_async.py b/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client_async.py index 567303a12521..7f1d67dc31db 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client_async.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/_shared/base_client_async.py @@ -15,6 +15,8 @@ from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport except ImportError: from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport + +from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy from azure.core.pipeline.policies import ( ContentDecodePolicy, BearerTokenCredentialPolicy, @@ -81,5 +83,6 @@ def _create_pipeline(self, credential, **kwargs): config.retry_policy, config.logging_policy, AsyncStorageResponseHook(**kwargs), + DistributedTracingPolicy(), ] return config, AsyncPipeline(config.transport, policies=policies) diff --git a/sdk/storage/azure-storage-file/azure/storage/file/_shared/downloads.py b/sdk/storage/azure-storage-file/azure/storage/file/_shared/downloads.py index 1d46ffc95293..1d032e5f9be7 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/_shared/downloads.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/_shared/downloads.py @@ -9,6 +9,7 @@ from io import BytesIO from azure.core.exceptions import HttpResponseError +from azure.core.tracing.context import tracing_context from .request_handlers import validate_and_format_range_headers from .response_handlers import process_storage_error, parse_length_from_content_range @@ -454,7 +455,7 @@ def download_to_stream(self, stream, max_connections=1): if max_connections > 1: import concurrent.futures executor = concurrent.futures.ThreadPoolExecutor(max_connections) - list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets())) + list(executor.map(tracing_context.with_current_context(downloader.process_chunk), downloader.get_chunk_offsets())) else: for chunk in downloader.get_chunk_offsets(): downloader.process_chunk(chunk) diff --git a/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py b/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py index 2b269fb1d0ba..2df85b7771c9 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py @@ -13,6 +13,8 @@ import six +from azure.core.tracing.context import tracing_context + from . import encode_base64, url_quote from .request_handlers import get_length from .response_handlers import return_response_headers @@ -34,7 +36,7 @@ def _parallel_uploads(executor, uploader, pending, running): except StopIteration: break else: - running.add(executor.submit(uploader.process_chunk, next_chunk)) + running.add(executor.submit(tracing_context.with_current_context(uploader.process_chunk), next_chunk)) # Wait for the remaining uploads to finish done, _running = futures.wait(running) @@ -79,7 +81,7 @@ def upload_data_chunks( executor = futures.ThreadPoolExecutor(max_connections) upload_tasks = uploader.get_chunk_streams() running_futures = [ - executor.submit(uploader.process_chunk, u) + executor.submit(tracing_context.with_current_context(uploader.process_chunk), u) for u in islice(upload_tasks, 0, max_connections) ] range_ids = _parallel_uploads(executor, uploader, upload_tasks, running_futures) @@ -115,7 +117,7 @@ def upload_substream_blocks( executor = futures.ThreadPoolExecutor(max_connections) upload_tasks = uploader.get_substream_blocks() running_futures = [ - executor.submit(uploader.process_substream_block, u) + executor.submit(tracing_context.with_current_context(uploader.process_substream_block), u) for u in islice(upload_tasks, 0, max_connections) ] return _parallel_uploads(executor, uploader, upload_tasks, running_futures) diff --git a/sdk/storage/azure-storage-file/azure/storage/file/aio/directory_client_async.py b/sdk/storage/azure-storage-file/azure/storage/file/aio/directory_client_async.py index 3d894da41a00..2cd010abf3ea 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/aio/directory_client_async.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/aio/directory_client_async.py @@ -11,6 +11,9 @@ from azure.core.polling import async_poller +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async + from .._generated.aio import AzureFileStorage from .._generated.version import VERSION from .._generated.models import StorageErrorException @@ -132,6 +135,7 @@ def get_subdirectory_client(self, directory_name, **kwargs): _hosts=self._hosts, _configuration=self._config, _pipeline=self._pipeline, _location_mode=self._location_mode, loop=self._loop, **kwargs) + @distributed_trace_async async def create_directory( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] timeout=None, # type: Optional[int] @@ -167,6 +171,7 @@ async def create_directory( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def delete_directory(self, timeout=None, **kwargs): # type: (Optional[int], **Any) -> None """Marks the directory for deletion. The directory is @@ -189,6 +194,7 @@ async def delete_directory(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_directories_and_files(self, name_starts_with=None, marker=None, timeout=None, **kwargs): # type: (Optional[str], Optional[str], Optional[int], **Any) -> DirectoryProperties """Lists all the directories and files under the directory. @@ -222,6 +228,7 @@ def list_directories_and_files(self, name_starts_with=None, marker=None, timeout return DirectoryPropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs): """Lists opened handles on a directory or a file under the directory. @@ -247,6 +254,7 @@ def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs): return HandlesPaged( command, results_per_page=results_per_page, marker=marker) + @distributed_trace_async async def close_handles( self, handle=None, # type: Union[str, HandleItem] recursive=False, # type: bool @@ -295,6 +303,7 @@ async def close_handles( None, polling_method) + @distributed_trace_async async def get_directory_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> DirectoryProperties """Returns all user-defined metadata and system properties for the @@ -315,6 +324,7 @@ async def get_directory_properties(self, timeout=None, **kwargs): process_storage_error(error) return response # type: ignore + @distributed_trace_async async def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type: ignore # type: (Dict[str, Any], Optional[int], Any) -> Dict[str, Any] """Sets the metadata for the directory. @@ -342,6 +352,7 @@ async def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def create_subdirectory( self, directory_name, # type: str metadata=None, #type: Optional[Dict[str, Any]] @@ -374,6 +385,7 @@ async def create_subdirectory( await subdir.create_directory(metadata=metadata, timeout=timeout, **kwargs) return subdir # type: ignore + @distributed_trace_async async def delete_subdirectory( self, directory_name, # type: str timeout=None, # type: Optional[int] @@ -399,6 +411,7 @@ async def delete_subdirectory( subdir = self.get_subdirectory_client(directory_name) await subdir.delete_directory(timeout=timeout, **kwargs) + @distributed_trace_async async def upload_file( self, file_name, # type: str data, # type: Any @@ -463,6 +476,7 @@ async def upload_file( **kwargs) return file_client # type: ignore + @distributed_trace_async async def delete_file( self, file_name, # type: str timeout=None, # type: Optional[int] diff --git a/sdk/storage/azure-storage-file/azure/storage/file/aio/file_client_async.py b/sdk/storage/azure-storage-file/azure/storage/file/aio/file_client_async.py index 13726d9100cf..57610f00e1cb 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/aio/file_client_async.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/aio/file_client_async.py @@ -6,14 +6,14 @@ import functools from io import BytesIO -from typing import ( # pylint: disable=unused-import - Optional, Union, IO, List, Dict, Any, Iterable, - TYPE_CHECKING -) +from typing import Optional, Union, IO, List, Dict, Any, Iterable, TYPE_CHECKING # pylint: disable=unused-import import six from azure.core.polling import async_poller +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async + from .._generated.aio import AzureFileStorage from .._generated.version import VERSION from .._generated.models import StorageErrorException, FileHTTPHeaders @@ -35,25 +35,22 @@ async def _upload_file_helper( - client, - stream, - size, - metadata, - content_settings, - validate_content, - timeout, - max_connections, - file_settings, - **kwargs): + client, + stream, + size, + metadata, + content_settings, + validate_content, + timeout, + max_connections, + file_settings, + **kwargs +): try: if size is None or size < 0: raise ValueError("A content size must be specified for a File.") response = await client.create_file( - size, - content_settings=content_settings, - metadata=metadata, - timeout=timeout, - **kwargs + size, content_settings=content_settings, metadata=metadata, timeout=timeout, **kwargs ) if size == 0: return response @@ -67,7 +64,8 @@ async def _upload_file_helper( max_connections=max_connections, validate_content=validate_content, timeout=timeout, - **kwargs) + **kwargs + ) except StorageErrorException as error: process_storage_error(error) @@ -108,35 +106,34 @@ class FileClient(AsyncStorageAccountHostsMixin, FileClientBase): account URL already has a SAS token. The value can be a SAS token string or an account shared access key. """ - def __init__( # type: ignore - self, file_url, # type: str - share=None, # type: Optional[Union[str, ShareProperties]] - file_path=None, # type: Optional[str] - snapshot=None, # type: Optional[Union[str, Dict[str, Any]]] - credential=None, # type: Optional[Any] - loop=None, # type: Any - **kwargs # type: Any - ): + + def __init__( # type: ignore + self, + file_url, # type: str + share=None, # type: Optional[Union[str, ShareProperties]] + file_path=None, # type: Optional[str] + snapshot=None, # type: Optional[Union[str, Dict[str, Any]]] + credential=None, # type: Optional[Any] + loop=None, # type: Any + **kwargs # type: Any + ): # type: (...) -> None - kwargs['retry_policy'] = kwargs.get('retry_policy') or ExponentialRetry(**kwargs) + kwargs["retry_policy"] = kwargs.get("retry_policy") or ExponentialRetry(**kwargs) super(FileClient, self).__init__( - file_url, - share=share, - file_path=file_path, - snapshot=snapshot, - credential=credential, - loop=loop, - **kwargs) + file_url, share=share, file_path=file_path, snapshot=snapshot, credential=credential, loop=loop, **kwargs + ) self._client = AzureFileStorage(version=VERSION, url=self.url, pipeline=self._pipeline, loop=loop) self._loop = loop - async def create_file( # type: ignore - self, size, # type: int - content_settings=None, # type: Optional[ContentSettings] - metadata=None, # type: Optional[Dict[str, str]] - timeout=None, # type: Optional[int] - **kwargs # type: Any - ): + @distributed_trace_async + async def create_file( # type: ignore + self, + size, # type: int + content_settings=None, # type: Optional[ContentSettings] + metadata=None, # type: Optional[Dict[str, str]] + timeout=None, # type: Optional[int] + **kwargs # type: Any + ): # type: (...) -> Dict[str, Any] """Creates a new file. @@ -165,7 +162,7 @@ async def create_file( # type: ignore if self.require_encryption and not self.key_encryption_key: raise ValueError("Encryption required but no key was provided.") - headers = kwargs.pop('headers', {}) + headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) file_http_headers = None if content_settings: @@ -175,31 +172,34 @@ async def create_file( # type: ignore file_content_md5=bytearray(content_settings.content_md5) if content_settings.content_md5 else None, file_content_encoding=content_settings.content_encoding, file_content_language=content_settings.content_language, - file_content_disposition=content_settings.content_disposition + file_content_disposition=content_settings.content_disposition, ) try: - return await self._client.file.create( # type: ignore + return await self._client.file.create( # type: ignore file_content_length=size, timeout=timeout, metadata=metadata, file_http_headers=file_http_headers, headers=headers, cls=return_response_headers, - **kwargs) + **kwargs + ) except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def upload_file( - self, data, # type: Any - length=None, # type: Optional[int] - metadata=None, # type: Optional[Dict[str, str]] - content_settings=None, # type: Optional[ContentSettings] - validate_content=False, # type: bool - max_connections=1, # type: Optional[int] - timeout=None, # type: Optional[int] - encoding='UTF-8', # type: str - **kwargs # type: Any - ): + self, + data, # type: Any + length=None, # type: Optional[int] + metadata=None, # type: Optional[Dict[str, str]] + content_settings=None, # type: Optional[ContentSettings] + validate_content=False, # type: bool + max_connections=1, # type: Optional[int] + timeout=None, # type: Optional[int] + encoding="UTF-8", # type: str + **kwargs # type: Any + ): # type: (...) -> Dict[str, Any] """Uploads a new file. @@ -248,13 +248,13 @@ async def upload_file( if isinstance(data, bytes): stream = BytesIO(data) - elif hasattr(data, 'read'): + elif hasattr(data, "read"): stream = data - elif hasattr(data, '__iter__'): - stream = IterStreamer(data, encoding=encoding) # type: ignore + elif hasattr(data, "__iter__"): + stream = IterStreamer(data, encoding=encoding) # type: ignore else: raise TypeError("Unsupported data type: {}".format(type(data))) - return await _upload_file_helper( # type: ignore + return await _upload_file_helper( # type: ignore self, stream, length, @@ -264,14 +264,17 @@ async def upload_file( timeout, max_connections, self._config, - **kwargs) + **kwargs + ) + @distributed_trace_async async def start_copy_from_url( - self, source_url, # type: str - metadata=None, # type: Optional[Dict[str, str]] - timeout=None, # type: Optional[int] - **kwargs # type: Any - ): + self, + source_url, # type: str + metadata=None, # type: Optional[Dict[str, str]] + timeout=None, # type: Optional[int] + **kwargs # type: Any + ): # type: (...) -> Any """Initiates the copying of data from a source URL into the file referenced by the client. @@ -296,20 +299,17 @@ async def start_copy_from_url( :dedent: 12 :caption: Copy a file from a URL """ - headers = kwargs.pop('headers', {}) + headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: return await self._client.file.start_copy( - source_url, - timeout=timeout, - metadata=metadata, - headers=headers, - cls=return_response_headers, - **kwargs) + source_url, timeout=timeout, metadata=metadata, headers=headers, cls=return_response_headers, **kwargs + ) except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def abort_copy(self, copy_id, timeout=None, **kwargs): # type: (Union[str, FileProperties], Optional[int], Any) -> Dict[str, Any] """Abort an ongoing copy operation. @@ -327,7 +327,7 @@ async def abort_copy(self, copy_id, timeout=None, **kwargs): copy_id = copy_id.copy.id except AttributeError: try: - copy_id = copy_id['copy_id'] + copy_id = copy_id["copy_id"] except TypeError: pass try: @@ -335,13 +335,15 @@ async def abort_copy(self, copy_id, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def download_file( - self, offset=None, # type: Optional[int] - length=None, # type: Optional[int] - validate_content=False, # type: bool - timeout=None, # type: Optional[int] - **kwargs - ): + self, + offset=None, # type: Optional[int] + length=None, # type: Optional[int] + validate_content=False, # type: bool + timeout=None, # type: Optional[int] + **kwargs + ): # type: (...) -> Iterable[bytes] """Downloads a file to a stream with automatic chunking. @@ -386,15 +388,14 @@ async def download_file( encryption_options=None, cls=deserialize_file_stream, timeout=timeout, - **kwargs) + **kwargs + ) await downloader.setup( - extra_properties={ - 'share': self.share_name, - 'name': self.file_name, - 'path': '/'.join(self.file_path), - }) + extra_properties={"share": self.share_name, "name": self.file_name, "path": "/".join(self.file_path)} + ) return downloader + @distributed_trace_async async def delete_file(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> None """Marks the specified file for deletion. The file is @@ -417,6 +418,7 @@ async def delete_file(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def get_file_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> FileProperties """Returns all user-defined metadata, standard HTTP properties, and @@ -429,20 +431,19 @@ async def get_file_properties(self, timeout=None, **kwargs): """ try: file_props = await self._client.file.get_properties( - sharesnapshot=self.snapshot, - timeout=timeout, - cls=deserialize_file_properties, - **kwargs) + sharesnapshot=self.snapshot, timeout=timeout, cls=deserialize_file_properties, **kwargs + ) except StorageErrorException as error: process_storage_error(error) file_props.name = self.file_name file_props.share = self.share_name file_props.snapshot = self.snapshot - file_props.path = '/'.join(self.file_path) - return file_props # type: ignore + file_props.path = "/".join(self.file_path) + return file_props # type: ignore - async def set_http_headers(self, content_settings, timeout=None, **kwargs): # type: ignore - #type: (ContentSettings, Optional[int], Optional[Any]) -> Dict[str, Any] + @distributed_trace_async + async def set_http_headers(self, content_settings, timeout=None, **kwargs): # type: ignore + # type: (ContentSettings, Optional[int], Optional[Any]) -> Dict[str, Any] """Sets HTTP headers on the file. :param ~azure.storage.file.models.ContentSettings content_settings: @@ -452,27 +453,29 @@ async def set_http_headers(self, content_settings, timeout=None, **kwargs): # ty :returns: File-updated property dict (Etag and last modified). :rtype: dict(str, Any) """ - file_content_length = kwargs.pop('size', None) + file_content_length = kwargs.pop("size", None) file_http_headers = FileHTTPHeaders( file_cache_control=content_settings.cache_control, file_content_type=content_settings.content_type, file_content_md5=bytearray(content_settings.content_md5) if content_settings.content_md5 else None, file_content_encoding=content_settings.content_encoding, file_content_language=content_settings.content_language, - file_content_disposition=content_settings.content_disposition + file_content_disposition=content_settings.content_disposition, ) try: - return await self._client.file.set_http_headers( # type: ignore + return await self._client.file.set_http_headers( # type: ignore timeout=timeout, file_content_length=file_content_length, file_http_headers=file_http_headers, cls=return_response_headers, - **kwargs) + **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def set_file_metadata(self, metadata=None, timeout=None, **kwargs): # type: ignore - #type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> Dict[str, Any] + @distributed_trace_async + async def set_file_metadata(self, metadata=None, timeout=None, **kwargs): # type: ignore + # type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> Dict[str, Any] """Sets user-defined metadata for the specified file as one or more name-value pairs. @@ -488,27 +491,26 @@ async def set_file_metadata(self, metadata=None, timeout=None, **kwargs): # type :returns: File-updated property dict (Etag and last modified). :rtype: dict(str, Any) """ - headers = kwargs.pop('headers', {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers = kwargs.pop("headers", {}) + headers.update(add_metadata_headers(metadata)) # type: ignore try: - return await self._client.file.set_metadata( # type: ignore - timeout=timeout, - cls=return_response_headers, - headers=headers, - metadata=metadata, - **kwargs) + return await self._client.file.set_metadata( # type: ignore + timeout=timeout, cls=return_response_headers, headers=headers, metadata=metadata, **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def upload_range( # type: ignore - self, data, # type: bytes - start_range, # type: int - end_range, # type: int - validate_content=False, # type: Optional[bool] - timeout=None, # type: Optional[int] - encoding='UTF-8', - **kwargs - ): + @distributed_trace_async + async def upload_range( # type: ignore + self, + data, # type: bytes + start_range, # type: int + end_range, # type: int + validate_content=False, # type: Optional[bool] + timeout=None, # type: Optional[int] + encoding="UTF-8", + **kwargs + ): # type: (...) -> Dict[str, Any] """Upload a range of bytes to a file. @@ -543,26 +545,29 @@ async def upload_range( # type: ignore if isinstance(data, six.text_type): data = data.encode(encoding) - content_range = 'bytes={0}-{1}'.format(start_range, end_range) + content_range = "bytes={0}-{1}".format(start_range, end_range) content_length = end_range - start_range + 1 try: - return await self._client.file.upload_range( # type: ignore + return await self._client.file.upload_range( # type: ignore range=content_range, content_length=content_length, optionalbody=data, timeout=timeout, validate_content=validate_content, cls=return_response_headers, - **kwargs) + **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def get_ranges( # type: ignore - self, start_range=None, # type: Optional[int] - end_range=None, # type: Optional[int] - timeout=None, # type: Optional[int] - **kwargs - ): + @distributed_trace_async + async def get_ranges( # type: ignore + self, + start_range=None, # type: Optional[int] + end_range=None, # type: Optional[int] + timeout=None, # type: Optional[int] + **kwargs + ): # type: (...) -> List[dict[str, int]] """Returns the list of valid ranges of a file. @@ -585,25 +590,25 @@ async def get_ranges( # type: ignore content_range = None if start_range is not None: if end_range is not None: - content_range = 'bytes={0}-{1}'.format(start_range, end_range) + content_range = "bytes={0}-{1}".format(start_range, end_range) else: - content_range = 'bytes={0}-'.format(start_range) + content_range = "bytes={0}-".format(start_range) try: ranges = await self._client.file.get_range_list( - sharesnapshot=self.snapshot, - timeout=timeout, - range=content_range, - **kwargs) + sharesnapshot=self.snapshot, timeout=timeout, range=content_range, **kwargs + ) except StorageErrorException as error: process_storage_error(error) - return [{'start': b.start, 'end': b.end} for b in ranges] - - async def clear_range( # type: ignore - self, start_range, # type: int - end_range, # type: int - timeout=None, # type: Optional[int] - **kwargs - ): + return [{"start": b.start, "end": b.end} for b in ranges] + + @distributed_trace_async + async def clear_range( # type: ignore + self, + start_range, # type: int + end_range, # type: int + timeout=None, # type: Optional[int] + **kwargs + ): # type: (...) -> Dict[str, Any] """Clears the specified range and releases the space used in storage for that range. @@ -630,19 +635,21 @@ async def clear_range( # type: ignore raise ValueError("start_range must be an integer that aligns with 512 file size") if end_range is None or end_range % 512 != 511: raise ValueError("end_range must be an integer that aligns with 512 file size") - content_range = 'bytes={0}-{1}'.format(start_range, end_range) + content_range = "bytes={0}-{1}".format(start_range, end_range) try: - return await self._client.file.upload_range( # type: ignore + return await self._client.file.upload_range( # type: ignore timeout=timeout, cls=return_response_headers, content_length=0, file_range_write="clear", range=content_range, - **kwargs) + **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def resize_file(self, size, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def resize_file(self, size, timeout=None, **kwargs): # type: ignore # type: (int, Optional[int], Optional[Any]) -> Dict[str, Any] """Resizes a file to the specified size. @@ -654,14 +661,13 @@ async def resize_file(self, size, timeout=None, **kwargs): # type: ignore :rtype: Dict[str, Any] """ try: - return await self._client.file.set_http_headers( # type: ignore - timeout=timeout, - file_content_length=size, - cls=return_response_headers, - **kwargs) + return await self._client.file.set_http_headers( # type: ignore + timeout=timeout, file_content_length=size, cls=return_response_headers, **kwargs + ) except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_handles(self, marker=None, timeout=None, **kwargs): """Lists handles for file. @@ -673,20 +679,19 @@ def list_handles(self, marker=None, timeout=None, **kwargs): The timeout parameter is expressed in seconds. :returns: An auto-paging iterable of HandleItems """ - results_per_page = kwargs.pop('results_per_page', None) + results_per_page = kwargs.pop("results_per_page", None) command = functools.partial( - self._client.file.list_handles, - sharesnapshot=self.snapshot, - timeout=timeout, - **kwargs) - return HandlesPaged( - command, results_per_page=results_per_page, marker=marker) + self._client.file.list_handles, sharesnapshot=self.snapshot, timeout=timeout, **kwargs + ) + return HandlesPaged(command, results_per_page=results_per_page, marker=marker) + @distributed_trace_async async def close_handles( - self, handle=None, # type: Union[str, HandleItem] - timeout=None, # type: Optional[int] - **kwargs # type: Any - ): + self, + handle=None, # type: Union[str, HandleItem] + timeout=None, # type: Optional[int] + **kwargs # type: Any + ): # type: (...) -> Any """Close open file handles. @@ -703,24 +708,22 @@ async def close_handles( :rtype: ~azure.core.polling.LROPoller """ try: - handle_id = handle.id # type: ignore + handle_id = handle.id # type: ignore except AttributeError: - handle_id = handle or '*' + handle_id = handle or "*" command = functools.partial( self._client.file.force_close_handles, handle_id, timeout=timeout, sharesnapshot=self.snapshot, cls=return_response_headers, - **kwargs) + **kwargs + ) try: start_close = await command() except StorageErrorException as error: process_storage_error(error) polling_method = CloseHandlesAsync(self._config.copy_polling_interval) - return async_poller( - command, - start_close, - None, - polling_method) + return async_poller(command, start_close, None, polling_method) + diff --git a/sdk/storage/azure-storage-file/azure/storage/file/aio/file_service_client_async.py b/sdk/storage/azure-storage-file/azure/storage/file/aio/file_service_client_async.py index ffbb134c5a36..6311a44afc55 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/aio/file_service_client_async.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/aio/file_service_client_async.py @@ -10,6 +10,9 @@ TYPE_CHECKING ) +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async + from .._shared.base_client_async import AsyncStorageAccountHostsMixin from .._shared.response_handlers import process_storage_error from .._shared.policies_async import ExponentialRetry @@ -85,6 +88,7 @@ def __init__( self._client = AzureFileStorage(version=VERSION, url=self.url, pipeline=self._pipeline, loop=loop) self._loop = loop + @distributed_trace_async async def get_service_properties(self, timeout=None, **kwargs): # type(Optional[int]) -> Dict[str, Any] """Gets the properties of a storage account's File service, including @@ -107,6 +111,7 @@ async def get_service_properties(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def set_service_properties( self, hour_metrics=None, # type: Optional[Metrics] minute_metrics=None, # type: Optional[Metrics] @@ -154,6 +159,7 @@ async def set_service_properties( except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_shares( self, name_starts_with=None, # type: Optional[str] include_metadata=False, # type: Optional[bool] @@ -205,6 +211,7 @@ def list_shares( return SharePropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace_async async def create_share( self, share_name, # type: str metadata=None, # type: Optional[Dict[str, str]] @@ -240,6 +247,7 @@ async def create_share( await share.create_share(metadata, quota, timeout, **kwargs) return share + @distributed_trace_async async def delete_share( self, share_name, # type: Union[ShareProperties, str] delete_snapshots=False, # type: Optional[bool] diff --git a/sdk/storage/azure-storage-file/azure/storage/file/aio/share_client_async.py b/sdk/storage/azure-storage-file/azure/storage/file/aio/share_client_async.py index 18b677c288af..1fe40bb5b0cf 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/aio/share_client_async.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/aio/share_client_async.py @@ -8,6 +8,9 @@ Optional, Union, Dict, Any, TYPE_CHECKING ) +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async + from .._shared.policies_async import ExponentialRetry from .._shared.base_client_async import AsyncStorageAccountHostsMixin from .._shared.request_handlers import add_metadata_headers, serialize_iso @@ -112,6 +115,7 @@ def get_file_client(self, file_path): self.url, file_path=file_path, snapshot=self.snapshot, credential=self.credential, _hosts=self._hosts, _configuration=self._config, _pipeline=self._pipeline, _location_mode=self._location_mode, loop=self._loop) + @distributed_trace_async async def create_share( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] quota=None, # type: Optional[int] @@ -154,6 +158,7 @@ async def create_share( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def create_snapshot( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] timeout=None, # type: Optional[int] @@ -197,6 +202,7 @@ async def create_snapshot( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def delete_share( self, delete_snapshots=False, # type: Optional[bool] timeout=None, # type: Optional[int] @@ -232,6 +238,7 @@ async def delete_share( except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def get_share_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> ShareProperties """Returns all user-defined metadata and system properties for the @@ -263,6 +270,7 @@ async def get_share_properties(self, timeout=None, **kwargs): props.snapshot = self.snapshot return props # type: ignore + @distributed_trace_async async def set_share_quota(self, quota, timeout=None, **kwargs): # type: ignore # type: (int, Optional[int], Any) -> Dict[str, Any] """Sets the quota for the share. @@ -292,6 +300,7 @@ async def set_share_quota(self, quota, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def set_share_metadata(self, metadata, timeout=None, **kwargs): # type: ignore # type: (Dict[str, Any], Optional[int], Any) -> Dict[str, Any] """Sets the metadata for the share. @@ -327,6 +336,7 @@ async def set_share_metadata(self, metadata, timeout=None, **kwargs): # type: ig except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def get_share_access_policy(self, timeout=None, **kwargs): # type: (Optional[int], **Any) -> Dict[str, Any] """Gets the permissions for the share. The permissions @@ -349,6 +359,7 @@ async def get_share_access_policy(self, timeout=None, **kwargs): 'signed_identifiers': identifiers or [] } + @distributed_trace_async async def set_share_access_policy(self, signed_identifiers=None, timeout=None, **kwargs): # type: ignore # type: (Optional[Dict[str, Optional[AccessPolicy]]], Optional[int], **Any) -> Dict[str, str] """Sets the permissions for the share, or stored access @@ -387,6 +398,7 @@ async def set_share_access_policy(self, signed_identifiers=None, timeout=None, * except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def get_share_stats(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], **Any) -> int """Gets the approximate size of the data stored on the share in bytes. @@ -407,6 +419,7 @@ async def get_share_stats(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_directories_and_files( # type: ignore self, directory_name=None, # type: Optional[str] name_starts_with=None, # type: Optional[str] @@ -442,6 +455,7 @@ def list_directories_and_files( # type: ignore return directory.list_directories_and_files( name_starts_with=name_starts_with, marker=marker, timeout=timeout, **kwargs) + @distributed_trace_async async def create_directory(self, directory_name, metadata=None, timeout=None, **kwargs): # type: (str, Optional[Dict[str, Any]], Optional[int], Any) -> DirectoryClient """Creates a directory in the share and returns a client to interact diff --git a/sdk/storage/azure-storage-file/azure/storage/file/directory_client.py b/sdk/storage/azure-storage-file/azure/storage/file/directory_client.py index 3922153aebd3..63f0fbf493d0 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/directory_client.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/directory_client.py @@ -16,6 +16,7 @@ import six from azure.core.polling import LROPoller +from azure.core.tracing.decorator import distributed_trace from ._generated import AzureFileStorage from ._generated.version import VERSION @@ -206,6 +207,7 @@ def get_subdirectory_client(self, directory_name, **kwargs): _hosts=self._hosts, _configuration=self._config, _pipeline=self._pipeline, _location_mode=self._location_mode, **kwargs) + @distributed_trace def create_directory( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] timeout=None, # type: Optional[int] @@ -241,6 +243,7 @@ def create_directory( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def delete_directory(self, timeout=None, **kwargs): # type: (Optional[int], **Any) -> None """Marks the directory for deletion. The directory is @@ -263,6 +266,7 @@ def delete_directory(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_directories_and_files(self, name_starts_with=None, marker=None, timeout=None, **kwargs): # type: (Optional[str], Optional[str], Optional[int], **Any) -> DirectoryProperties """Lists all the directories and files under the directory. @@ -296,6 +300,7 @@ def list_directories_and_files(self, name_starts_with=None, marker=None, timeout return DirectoryPropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs): """Lists opened handles on a directory or a file under the directory. @@ -321,6 +326,7 @@ def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs): return HandlesPaged( command, results_per_page=results_per_page, marker=marker) + @distributed_trace def close_handles( self, handle=None, # type: Union[str, HandleItem] recursive=False, # type: bool @@ -369,6 +375,7 @@ def close_handles( None, polling_method) + @distributed_trace def get_directory_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> DirectoryProperties """Returns all user-defined metadata and system properties for the @@ -389,6 +396,7 @@ def get_directory_properties(self, timeout=None, **kwargs): process_storage_error(error) return response # type: ignore + @distributed_trace def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type: ignore # type: (Dict[str, Any], Optional[int], Any) -> Dict[str, Any] """Sets the metadata for the directory. @@ -416,6 +424,7 @@ def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type: igno except StorageErrorException as error: process_storage_error(error) + @distributed_trace def create_subdirectory( self, directory_name, # type: str metadata=None, #type: Optional[Dict[str, Any]] @@ -448,6 +457,7 @@ def create_subdirectory( subdir.create_directory(metadata=metadata, timeout=timeout, **kwargs) return subdir # type: ignore + @distributed_trace def delete_subdirectory( self, directory_name, # type: str timeout=None, # type: Optional[int] @@ -473,6 +483,7 @@ def delete_subdirectory( subdir = self.get_subdirectory_client(directory_name) subdir.delete_directory(timeout=timeout, **kwargs) + @distributed_trace def upload_file( self, file_name, # type: str data, # type: Any @@ -537,6 +548,7 @@ def upload_file( **kwargs) return file_client # type: ignore + @distributed_trace def delete_file( self, file_name, # type: str timeout=None, # type: Optional[int] diff --git a/sdk/storage/azure-storage-file/azure/storage/file/file_client.py b/sdk/storage/azure-storage-file/azure/storage/file/file_client.py index af3294a258cb..5c11c5c5e8b4 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/file_client.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/file_client.py @@ -18,6 +18,7 @@ import six from azure.core.polling import LROPoller +from azure.core.tracing.decorator import distributed_trace from ._generated import AzureFileStorage from ._generated.version import VERSION @@ -313,6 +314,7 @@ def generate_shared_access_signature( content_language=content_language, content_type=content_type) + @distributed_trace def create_file( # type: ignore self, size, # type: int content_settings=None, # type: Optional[ContentSettings] @@ -372,6 +374,7 @@ def create_file( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def upload_file( self, data, # type: Any length=None, # type: Optional[int] @@ -449,6 +452,7 @@ def upload_file( self._config, **kwargs) + @distributed_trace def start_copy_from_url( self, source_url, # type: str metadata=None, # type: Optional[Dict[str, str]] @@ -518,6 +522,7 @@ def abort_copy(self, copy_id, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def download_file( self, offset=None, # type: Optional[int] length=None, # type: Optional[int] @@ -576,6 +581,7 @@ def download_file( timeout=timeout, **kwargs) + @distributed_trace def delete_file(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> None """Marks the specified file for deletion. The file is @@ -598,6 +604,7 @@ def delete_file(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_file_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> FileProperties """Returns all user-defined metadata, standard HTTP properties, and @@ -622,6 +629,7 @@ def get_file_properties(self, timeout=None, **kwargs): file_props.path = '/'.join(self.file_path) return file_props # type: ignore + @distributed_trace def set_http_headers(self, content_settings, timeout=None, **kwargs): # type: ignore #type: (ContentSettings, Optional[int], Optional[Any]) -> Dict[str, Any] """Sets HTTP headers on the file. @@ -652,6 +660,7 @@ def set_http_headers(self, content_settings, timeout=None, **kwargs): # type: ig except StorageErrorException as error: process_storage_error(error) + @distributed_trace def set_file_metadata(self, metadata=None, timeout=None, **kwargs): # type: ignore #type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> Dict[str, Any] """Sets user-defined metadata for the specified file as one or more @@ -681,6 +690,7 @@ def set_file_metadata(self, metadata=None, timeout=None, **kwargs): # type: igno except StorageErrorException as error: process_storage_error(error) + @distributed_trace def upload_range( # type: ignore self, data, # type: bytes start_range, # type: int @@ -738,6 +748,7 @@ def upload_range( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_ranges( # type: ignore self, start_range=None, # type: Optional[int] end_range=None, # type: Optional[int] @@ -779,6 +790,7 @@ def get_ranges( # type: ignore process_storage_error(error) return [{'start': b.start, 'end': b.end} for b in ranges] + @distributed_trace def clear_range( # type: ignore self, start_range, # type: int end_range, # type: int @@ -823,6 +835,7 @@ def clear_range( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def resize_file(self, size, timeout=None, **kwargs): # type: ignore # type: (int, Optional[int], Optional[Any]) -> Dict[str, Any] """Resizes a file to the specified size. @@ -843,6 +856,7 @@ def resize_file(self, size, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_handles(self, marker=None, timeout=None, **kwargs): """Lists handles for file. @@ -863,6 +877,7 @@ def list_handles(self, marker=None, timeout=None, **kwargs): return HandlesPaged( command, results_per_page=results_per_page, marker=marker) + @distributed_trace def close_handles( self, handle=None, # type: Union[str, HandleItem] timeout=None, # type: Optional[int] diff --git a/sdk/storage/azure-storage-file/azure/storage/file/file_service_client.py b/sdk/storage/azure-storage-file/azure/storage/file/file_service_client.py index 3c0ee513b260..d064d5294517 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/file_service_client.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/file_service_client.py @@ -14,6 +14,7 @@ except ImportError: from urlparse import urlparse # type: ignore +from azure.core.tracing.decorator import distributed_trace from ._shared.shared_access_signature import SharedAccessSignature from ._shared.models import Services from ._shared.base_client import StorageAccountHostsMixin, parse_connection_str, parse_query @@ -194,6 +195,7 @@ def generate_shared_access_signature( return sas.generate_account( Services.FILE, resource_types, permission, expiry, start=start, ip=ip, protocol=protocol) # type: ignore + @distributed_trace def get_service_properties(self, timeout=None, **kwargs): # type(Optional[int]) -> Dict[str, Any] """Gets the properties of a storage account's File service, including @@ -216,6 +218,7 @@ def get_service_properties(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def set_service_properties( self, hour_metrics=None, # type: Optional[Metrics] minute_metrics=None, # type: Optional[Metrics] @@ -263,6 +266,7 @@ def set_service_properties( except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_shares( self, name_starts_with=None, # type: Optional[str] include_metadata=False, # type: Optional[bool] @@ -314,6 +318,7 @@ def list_shares( return SharePropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace def create_share( self, share_name, # type: str metadata=None, # type: Optional[Dict[str, str]] @@ -349,6 +354,7 @@ def create_share( share.create_share(metadata, quota, timeout, **kwargs) return share + @distributed_trace def delete_share( self, share_name, # type: Union[ShareProperties, str] delete_snapshots=False, # type: Optional[bool] diff --git a/sdk/storage/azure-storage-file/azure/storage/file/share_client.py b/sdk/storage/azure-storage-file/azure/storage/file/share_client.py index ff5b183f49be..4fd917d31c3e 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/share_client.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/share_client.py @@ -14,7 +14,7 @@ from urllib2 import quote, unquote # type: ignore import six - +from azure.core.tracing.decorator import distributed_trace from ._shared.shared_access_signature import FileSharedAccessSignature from ._shared.base_client import StorageAccountHostsMixin, parse_connection_str, parse_query from ._shared.request_handlers import add_metadata_headers, serialize_iso @@ -280,6 +280,7 @@ def get_file_client(self, file_path): self.url, file_path=file_path, snapshot=self.snapshot, credential=self.credential, _hosts=self._hosts, _configuration=self._config, _pipeline=self._pipeline, _location_mode=self._location_mode) + @distributed_trace def create_share( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] quota=None, # type: Optional[int] @@ -322,6 +323,7 @@ def create_share( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def create_snapshot( # type: ignore self, metadata=None, # type: Optional[Dict[str, str]] timeout=None, # type: Optional[int] @@ -365,6 +367,7 @@ def create_snapshot( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def delete_share( self, delete_snapshots=False, # type: Optional[bool] timeout=None, # type: Optional[int] @@ -400,6 +403,7 @@ def delete_share( except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_share_properties(self, timeout=None, **kwargs): # type: (Optional[int], Any) -> ShareProperties """Returns all user-defined metadata and system properties for the @@ -431,6 +435,7 @@ def get_share_properties(self, timeout=None, **kwargs): props.snapshot = self.snapshot return props # type: ignore + @distributed_trace def set_share_quota(self, quota, timeout=None, **kwargs): # type: ignore # type: (int, Optional[int], Any) -> Dict[str, Any] """Sets the quota for the share. @@ -460,6 +465,7 @@ def set_share_quota(self, quota, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def set_share_metadata(self, metadata, timeout=None, **kwargs): # type: ignore # type: (Dict[str, Any], Optional[int], Any) -> Dict[str, Any] """Sets the metadata for the share. @@ -495,6 +501,7 @@ def set_share_metadata(self, metadata, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_share_access_policy(self, timeout=None, **kwargs): # type: (Optional[int], **Any) -> Dict[str, Any] """Gets the permissions for the share. The permissions @@ -517,6 +524,7 @@ def get_share_access_policy(self, timeout=None, **kwargs): 'signed_identifiers': identifiers or [] } + @distributed_trace def set_share_access_policy(self, signed_identifiers=None, timeout=None, **kwargs): # type: ignore # type: (Optional[Dict[str, Optional[AccessPolicy]]], Optional[int], **Any) -> Dict[str, str] """Sets the permissions for the share, or stored access @@ -555,6 +563,7 @@ def set_share_access_policy(self, signed_identifiers=None, timeout=None, **kwarg except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_share_stats(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], **Any) -> int """Gets the approximate size of the data stored on the share in bytes. @@ -575,6 +584,7 @@ def get_share_stats(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_directories_and_files( # type: ignore self, directory_name=None, # type: Optional[str] name_starts_with=None, # type: Optional[str] @@ -610,6 +620,7 @@ def list_directories_and_files( # type: ignore return directory.list_directories_and_files( name_starts_with=name_starts_with, marker=marker, timeout=timeout, **kwargs) + @distributed_trace def create_directory(self, directory_name, metadata=None, timeout=None, **kwargs): # type: (str, Optional[Dict[str, Any]], Optional[int], Any) -> DirectoryClient """Creates a directory in the share and returns a client to interact diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py index 2b269fb1d0ba..4f61476cde21 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py @@ -20,7 +20,7 @@ _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024 -_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.' +_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = "{0} should be a seekable file-like/io.IOBase type stream object." def _parallel_uploads(executor, uploader, pending, running): @@ -150,7 +150,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor= def get_chunk_streams(self): index = 0 while True: - data = b'' + data = b"" read_size = self.chunk_size # Buffer until we either reach the end of the stream or get a whole chunk. @@ -159,12 +159,12 @@ def get_chunk_streams(self): read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data))) temp = self.stream.read(read_size) if not isinstance(temp, six.binary_type): - raise TypeError('Blob data should be of type bytes.') + raise TypeError("Blob data should be of type bytes.") data += temp or b"" # We have read an empty string and so are at the end # of the buffer or we have read a full chunk. - if temp == b'' or len(data) == self.chunk_size: + if temp == b"" or len(data) == self.chunk_size: break if len(data) == self.chunk_size: @@ -247,7 +247,8 @@ def _upload_chunk(self, chunk_offset, chunk_data): chunk_data, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) return block_id def _upload_substream_block(self, block_id, block_stream): @@ -258,7 +259,8 @@ def _upload_substream_block(self, block_id, block_stream): block_stream, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) finally: block_stream.close() return block_id @@ -270,7 +272,7 @@ def _is_chunk_empty(self, chunk_data): # read until non-zero byte is encountered # if reached the end without returning, then chunk_data is all 0's for each_byte in chunk_data: - if each_byte not in [0, b'\x00']: + if each_byte not in [0, b"\x00"]: return False return True @@ -278,7 +280,7 @@ def _upload_chunk(self, chunk_offset, chunk_data): # avoid uploading the empty pages if not self._is_chunk_empty(chunk_data): chunk_end = chunk_offset + len(chunk_data) - 1 - content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end) + content_range = "bytes={0}-{1}".format(chunk_offset, chunk_end) computed_md5 = None self.response_headers = self.service.upload_pages( chunk_data, @@ -288,7 +290,8 @@ def _upload_chunk(self, chunk_offset, chunk_data): cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) if not self.parallel and self.request_options.get('modified_access_conditions'): self.request_options['modified_access_conditions'].if_match = self.response_headers['etag'] @@ -310,7 +313,7 @@ def _upload_chunk(self, chunk_offset, chunk_data): upload_stream_current=self.progress_total, **self.request_options ) - self.current_length = int(self.response_headers['blob_append_offset']) + self.current_length = int(self.response_headers["blob_append_offset"]) else: self.request_options['append_position_access_conditions'].append_position = \ self.current_length + chunk_offset @@ -360,8 +363,9 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj): # we must avoid buffering more than necessary, and also not use up too much memory # so the max buffer size is capped at 4MB - self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \ - else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE + self._max_buffer_size = ( + length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE + ) self._current_buffer_start = 0 self._current_buffer_size = 0 super(SubStream, self).__init__() @@ -391,7 +395,7 @@ def read(self, n): # return fast if n == 0 or self._buffer.closed: - return b'' + return b"" # attempt first read from the read buffer and update position read_buffer = self._buffer.read(n) @@ -447,7 +451,7 @@ def seek(self, offset, whence=0): start_index = self._position elif whence is SEEK_END: start_index = self._length - offset = - offset + offset = -offset else: raise ValueError("Invalid argument for the 'whence' parameter.") @@ -490,10 +494,11 @@ class IterStreamer(object): """ File-like streaming iterator. """ - def __init__(self, generator, encoding='UTF-8'): + + def __init__(self, generator, encoding="UTF-8"): self.generator = generator self.iterator = iter(generator) - self.leftover = b'' + self.leftover = b"" self.encoding = encoding def __len__(self):