Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from azure.core import Configuration
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
Expand Down Expand Up @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, Pipeline(config.transport, policies=policies)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport
except ImportError:
from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport

from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
ContentDecodePolicy,
BearerTokenCredentialPolicy,
Expand Down Expand Up @@ -81,5 +83,6 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
AsyncStorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, AsyncPipeline(config.transport, policies=policies)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from io import BytesIO

from azure.core.exceptions import HttpResponseError
from azure.core.tracing.context import tracing_context

from .request_handlers import validate_and_format_range_headers
from .response_handlers import process_storage_error, parse_length_from_content_range
Expand Down Expand Up @@ -454,7 +455,7 @@ def download_to_stream(self, stream, max_connections=1):
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
list(executor.map(tracing_context.with_current_context(downloader.process_chunk), downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import six

from azure.core.tracing.context import tracing_context

from . import encode_base64, url_quote
from .request_handlers import get_length
from .response_handlers import return_response_headers
Expand All @@ -34,7 +36,7 @@ def _parallel_uploads(executor, uploader, pending, running):
except StopIteration:
break
else:
running.add(executor.submit(uploader.process_chunk, next_chunk))
running.add(executor.submit(tracing_context.with_current_context(uploader.process_chunk), next_chunk))

# Wait for the remaining uploads to finish
done, _running = futures.wait(running)
Expand Down Expand Up @@ -79,7 +81,7 @@ def upload_data_chunks(
executor = futures.ThreadPoolExecutor(max_connections)
upload_tasks = uploader.get_chunk_streams()
running_futures = [
executor.submit(uploader.process_chunk, u)
executor.submit(tracing_context.with_current_context(uploader.process_chunk), u)
for u in islice(upload_tasks, 0, max_connections)
]
range_ids = _parallel_uploads(executor, uploader, upload_tasks, running_futures)
Expand Down Expand Up @@ -115,7 +117,7 @@ def upload_substream_blocks(
executor = futures.ThreadPoolExecutor(max_connections)
upload_tasks = uploader.get_substream_blocks()
running_futures = [
executor.submit(uploader.process_substream_block, u)
executor.submit(tracing_context.with_current_context(uploader.process_substream_block), u)
for u in islice(upload_tasks, 0, max_connections)
]
return _parallel_uploads(executor, uploader, upload_tasks, running_futures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

from azure.core.polling import async_poller

from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async

from .._generated.aio import AzureFileStorage
from .._generated.version import VERSION
from .._generated.models import StorageErrorException
Expand Down Expand Up @@ -132,6 +135,7 @@ def get_subdirectory_client(self, directory_name, **kwargs):
_hosts=self._hosts, _configuration=self._config, _pipeline=self._pipeline,
_location_mode=self._location_mode, loop=self._loop, **kwargs)

@distributed_trace_async
async def create_directory( # type: ignore
self, metadata=None, # type: Optional[Dict[str, str]]
timeout=None, # type: Optional[int]
Expand Down Expand Up @@ -167,6 +171,7 @@ async def create_directory( # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def delete_directory(self, timeout=None, **kwargs):
# type: (Optional[int], **Any) -> None
"""Marks the directory for deletion. The directory is
Expand All @@ -189,6 +194,7 @@ async def delete_directory(self, timeout=None, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace
def list_directories_and_files(self, name_starts_with=None, marker=None, timeout=None, **kwargs):
# type: (Optional[str], Optional[str], Optional[int], **Any) -> DirectoryProperties
"""Lists all the directories and files under the directory.
Expand Down Expand Up @@ -222,6 +228,7 @@ def list_directories_and_files(self, name_starts_with=None, marker=None, timeout
return DirectoryPropertiesPaged(
command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker)

@distributed_trace
def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs):
"""Lists opened handles on a directory or a file under the directory.

Expand All @@ -247,6 +254,7 @@ def list_handles(self, marker=None, recursive=False, timeout=None, **kwargs):
return HandlesPaged(
command, results_per_page=results_per_page, marker=marker)

@distributed_trace_async
async def close_handles(
self, handle=None, # type: Union[str, HandleItem]
recursive=False, # type: bool
Expand Down Expand Up @@ -295,6 +303,7 @@ async def close_handles(
None,
polling_method)

@distributed_trace_async
async def get_directory_properties(self, timeout=None, **kwargs):
# type: (Optional[int], Any) -> DirectoryProperties
"""Returns all user-defined metadata and system properties for the
Expand All @@ -315,6 +324,7 @@ async def get_directory_properties(self, timeout=None, **kwargs):
process_storage_error(error)
return response # type: ignore

@distributed_trace_async
async def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type: ignore
# type: (Dict[str, Any], Optional[int], Any) -> Dict[str, Any]
"""Sets the metadata for the directory.
Expand Down Expand Up @@ -342,6 +352,7 @@ async def set_directory_metadata(self, metadata, timeout=None, **kwargs): # type
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def create_subdirectory(
self, directory_name, # type: str
metadata=None, #type: Optional[Dict[str, Any]]
Expand Down Expand Up @@ -374,6 +385,7 @@ async def create_subdirectory(
await subdir.create_directory(metadata=metadata, timeout=timeout, **kwargs)
return subdir # type: ignore

@distributed_trace_async
async def delete_subdirectory(
self, directory_name, # type: str
timeout=None, # type: Optional[int]
Expand All @@ -399,6 +411,7 @@ async def delete_subdirectory(
subdir = self.get_subdirectory_client(directory_name)
await subdir.delete_directory(timeout=timeout, **kwargs)

@distributed_trace_async
async def upload_file(
self, file_name, # type: str
data, # type: Any
Expand Down Expand Up @@ -463,6 +476,7 @@ async def upload_file(
**kwargs)
return file_client # type: ignore

@distributed_trace_async
async def delete_file(
self, file_name, # type: str
timeout=None, # type: Optional[int]
Expand Down
Loading