Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1aba4e9
properly clear context (#6448)
SuyogSoti Jul 23, 2019
ed75ccb
added decorators to keyvault-keys (#6381)
SuyogSoti Jul 23, 2019
1dd1fa1
Remove default development settings (#6376)
chlowell Jul 23, 2019
9b40e0a
Trace decorator optional arguments (#6459)
SuyogSoti Jul 23, 2019
eefbc79
Use tracing policy (#6390)
SuyogSoti Jul 23, 2019
dddded7
Remove config public API for KV keys (#6418)
lmazuel Jul 23, 2019
1767d86
Update README.md
lmazuel Jul 24, 2019
d9a7166
decorate polling
SuyogSoti Jul 24, 2019
d9df0fd
decorate lease
SuyogSoti Jul 24, 2019
233e5cb
decorate container client
SuyogSoti Jul 24, 2019
ef8e862
trace blob service client
SuyogSoti Jul 24, 2019
cf95acb
polling method does not need to be decorated
SuyogSoti Jul 24, 2019
275b0b5
decorate blob client
SuyogSoti Jul 24, 2019
25e9594
add policy
SuyogSoti Jul 24, 2019
d83cb20
added policy to the blob
SuyogSoti Jul 24, 2019
7311073
Update root README.md
kaerm Jul 25, 2019
25cd7c7
get rid of util and propagate context properly.
SuyogSoti Jul 25, 2019
24da4c5
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-pytho…
SuyogSoti Jul 25, 2019
a4f4794
Merge branch 'storage-preview2' of https://github.com/Azure/azure-sdk…
SuyogSoti Jul 25, 2019
5b058c7
get rid of functions that do not do a network call
SuyogSoti Jul 25, 2019
1bd07ed
propagate context
SuyogSoti Jul 25, 2019
ce94519
Revert "propagate context"
SuyogSoti Jul 25, 2019
af38e33
Merge branch 'storage-preview2' of https://github.com/Azure/azure-sdk…
SuyogSoti Jul 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,14 @@ Libraries which enable you to provision specific resources. They are responsible
## Contributing
For details on contributing to this repository, see the [contributing guide](CONTRIBUTING.md).

## Code of Conduct
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit
https://cla.microsoft.com.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repositories using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/)
or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python%2FREADME.png)

Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@
# --------------------------------------------------------------------------

from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Iterable, Dict, List, Type, Tuple,
TYPE_CHECKING
Union,
Optional,
Any,
Iterable,
Dict,
List,
Type,
Tuple,
TYPE_CHECKING,
)
import logging

try:
from urllib.parse import parse_qs, quote
except ImportError:
from urlparse import parse_qs # type: ignore
from urllib2 import quote # type: ignore
from urlparse import parse_qs # type: ignore
from urllib2 import quote # type: ignore

import six

from azure.core import Configuration
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
BearerTokenCredentialPolicy,
ProxyPolicy)
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import RedirectPolicy, ContentDecodePolicy, BearerTokenCredentialPolicy, ProxyPolicy

from .constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, DEFAULT_SOCKET_TIMEOUT
from .models import LocationMode
Expand All @@ -39,55 +44,51 @@
StorageLoggingPolicy,
StorageHosts,
QueueMessagePolicy,
ExponentialRetry)
ExponentialRetry,
)


_LOGGER = logging.getLogger(__name__)
_SERVICE_PARAMS = {
'blob': {'primary': 'BlobEndpoint', 'secondary': 'BlobSecondaryEndpoint'},
'queue': {'primary': 'QueueEndpoint', 'secondary': 'QueueSecondaryEndpoint'},
'file': {'primary': 'FileEndpoint', 'secondary': 'FileSecondaryEndpoint'},
"blob": {"primary": "BlobEndpoint", "secondary": "BlobSecondaryEndpoint"},
"queue": {"primary": "QueueEndpoint", "secondary": "QueueSecondaryEndpoint"},
"file": {"primary": "FileEndpoint", "secondary": "FileSecondaryEndpoint"},
}


class StorageAccountHostsMixin(object):

def __init__(
self, parsed_url, # type: Any
service, # type: str
credential=None, # type: Optional[Any]
**kwargs # type: Any
):
self,
parsed_url, # type: Any
service, # type: str
credential=None, # type: Optional[Any]
**kwargs # type: Any
):
# type: (...) -> None
self._location_mode = kwargs.get('_location_mode', LocationMode.PRIMARY)
self._hosts = kwargs.get('_hosts')
self._location_mode = kwargs.get("_location_mode", LocationMode.PRIMARY)
self._hosts = kwargs.get("_hosts")
self.scheme = parsed_url.scheme

if service not in ['blob', 'queue', 'file']:
if service not in ["blob", "queue", "file"]:
raise ValueError("Invalid service: {}".format(service))
account = parsed_url.netloc.split(".{}.core.".format(service))
secondary_hostname = None
self.credential = format_shared_key_credential(account, credential)
if self.scheme.lower() != 'https' and hasattr(self.credential, 'get_token'):
if self.scheme.lower() != "https" and hasattr(self.credential, "get_token"):
raise ValueError("Token credential is only supported with HTTPS.")
if hasattr(self.credential, 'account_name'):
secondary_hostname = "{}-secondary.{}.{}".format(
self.credential.account_name, service, SERVICE_HOST_BASE)
if hasattr(self.credential, "account_name"):
secondary_hostname = "{}-secondary.{}.{}".format(self.credential.account_name, service, SERVICE_HOST_BASE)

if not self._hosts:
if len(account) > 1:
secondary_hostname = parsed_url.netloc.replace(
account[0],
account[0] + '-secondary')
if kwargs.get('secondary_hostname'):
secondary_hostname = kwargs['secondary_hostname']
self._hosts = {
LocationMode.PRIMARY: parsed_url.netloc,
LocationMode.SECONDARY: secondary_hostname}

self.require_encryption = kwargs.get('require_encryption', False)
self.key_encryption_key = kwargs.get('key_encryption_key')
self.key_resolver_function = kwargs.get('key_resolver_function')
secondary_hostname = parsed_url.netloc.replace(account[0], account[0] + "-secondary")
if kwargs.get("secondary_hostname"):
secondary_hostname = kwargs["secondary_hostname"]
self._hosts = {LocationMode.PRIMARY: parsed_url.netloc, LocationMode.SECONDARY: secondary_hostname}

self.require_encryption = kwargs.get("require_encryption", False)
self.key_encryption_key = kwargs.get("key_encryption_key")
self.key_resolver_function = kwargs.get("key_resolver_function")
self._config, self._pipeline = self._create_pipeline(self.credential, storage_sdk=service, **kwargs)

def __enter__(self):
Expand Down Expand Up @@ -134,32 +135,32 @@ def location_mode(self, value):
def _format_query_string(self, sas_token, credential, snapshot=None, share_snapshot=None):
query_str = "?"
if snapshot:
query_str += 'snapshot={}&'.format(self.snapshot)
query_str += "snapshot={}&".format(self.snapshot)
if share_snapshot:
query_str += 'sharesnapshot={}&'.format(self.snapshot)
query_str += "sharesnapshot={}&".format(self.snapshot)
if sas_token and not credential:
query_str += sas_token
elif is_credential_sastoken(credential):
query_str += credential.lstrip('?')
query_str += credential.lstrip("?")
credential = None
return query_str.rstrip('?&'), credential
return query_str.rstrip("?&"), credential

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
if hasattr(credential, 'get_token'):
if hasattr(credential, "get_token"):
credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))

config = kwargs.get('_configuration') or create_configuration(**kwargs)
if kwargs.get('_pipeline'):
return config, kwargs['_pipeline']
config.transport = kwargs.get('transport') # type: ignore
if 'connection_timeout' not in kwargs:
kwargs['connection_timeout'] = DEFAULT_SOCKET_TIMEOUT
config = kwargs.get("_configuration") or create_configuration(**kwargs)
if kwargs.get("_pipeline"):
return config, kwargs["_pipeline"]
config.transport = kwargs.get("transport") # type: ignore
if "connection_timeout" not in kwargs:
kwargs["connection_timeout"] = DEFAULT_SOCKET_TIMEOUT
if not config.transport:
config.transport = RequestsTransport(**kwargs)
policies = [
Expand All @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, Pipeline(config.transport, policies=policies)

Expand All @@ -183,61 +185,53 @@ def format_shared_key_credential(account, credential):
if isinstance(credential, six.string_types):
if len(account) < 2:
raise ValueError("Unable to determine account name for shared key credential.")
credential = {
'account_name': account[0],
'account_key': credential
}
credential = {"account_name": account[0], "account_key": credential}
if isinstance(credential, dict):
if 'account_name' not in credential:
if "account_name" not in credential:
raise ValueError("Shared key credential missing 'account_name")
if 'account_key' not in credential:
if "account_key" not in credential:
raise ValueError("Shared key credential missing 'account_key")
return SharedKeyCredentialPolicy(**credential)
return credential


def parse_connection_str(conn_str, credential, service):
conn_str = conn_str.rstrip(';')
conn_settings = dict([s.split('=', 1) for s in conn_str.split(';')]) # pylint: disable=consider-using-dict-comprehension
conn_str = conn_str.rstrip(";")
conn_settings = dict(
[s.split("=", 1) for s in conn_str.split(";")]
) # pylint: disable=consider-using-dict-comprehension
endpoints = _SERVICE_PARAMS[service]
primary = None
secondary = None
if not credential:
try:
credential = {
'account_name': conn_settings['AccountName'],
'account_key': conn_settings['AccountKey']
}
credential = {"account_name": conn_settings["AccountName"], "account_key": conn_settings["AccountKey"]}
except KeyError:
credential = conn_settings.get('SharedAccessSignature')
if endpoints['primary'] in conn_settings:
primary = conn_settings[endpoints['primary']]
if endpoints['secondary'] in conn_settings:
secondary = conn_settings[endpoints['secondary']]
credential = conn_settings.get("SharedAccessSignature")
if endpoints["primary"] in conn_settings:
primary = conn_settings[endpoints["primary"]]
if endpoints["secondary"] in conn_settings:
secondary = conn_settings[endpoints["secondary"]]
else:
if endpoints['secondary'] in conn_settings:
if endpoints["secondary"] in conn_settings:
raise ValueError("Connection string specifies only secondary endpoint.")
try:
primary = "{}://{}.{}.{}".format(
conn_settings['DefaultEndpointsProtocol'],
conn_settings['AccountName'],
conn_settings["DefaultEndpointsProtocol"],
conn_settings["AccountName"],
service,
conn_settings['EndpointSuffix']
conn_settings["EndpointSuffix"],
)
secondary = "{}-secondary.{}.{}".format(
conn_settings['AccountName'],
service,
conn_settings['EndpointSuffix']
conn_settings["AccountName"], service, conn_settings["EndpointSuffix"]
)
except KeyError:
pass

if not primary:
try:
primary = "https://{}.{}.{}".format(
conn_settings['AccountName'],
service,
conn_settings.get('EndpointSuffix', SERVICE_HOST_BASE)
conn_settings["AccountName"], service, conn_settings.get("EndpointSuffix", SERVICE_HOST_BASE)
)
except KeyError:
raise ValueError("Connection string missing required connection details.")
Expand All @@ -249,28 +243,28 @@ def create_configuration(**kwargs):
config = Configuration(**kwargs)
config.headers_policy = StorageHeadersPolicy(**kwargs)
config.user_agent_policy = StorageUserAgentPolicy(**kwargs)
config.retry_policy = kwargs.get('retry_policy') or ExponentialRetry(**kwargs)
config.retry_policy = kwargs.get("retry_policy") or ExponentialRetry(**kwargs)
config.logging_policy = StorageLoggingPolicy(**kwargs)
config.proxy_policy = ProxyPolicy(**kwargs)

# Storage settings
config.max_single_put_size = kwargs.get('max_single_put_size', 64 * 1024 * 1024)
config.max_single_put_size = kwargs.get("max_single_put_size", 64 * 1024 * 1024)
config.copy_polling_interval = 15

# Block blob uploads
config.max_block_size = kwargs.get('max_block_size', 4 * 1024 * 1024)
config.min_large_block_upload_threshold = kwargs.get('min_large_block_upload_threshold', 4 * 1024 * 1024 + 1)
config.use_byte_buffer = kwargs.get('use_byte_buffer', False)
config.max_block_size = kwargs.get("max_block_size", 4 * 1024 * 1024)
config.min_large_block_upload_threshold = kwargs.get("min_large_block_upload_threshold", 4 * 1024 * 1024 + 1)
config.use_byte_buffer = kwargs.get("use_byte_buffer", False)

# Page blob uploads
config.max_page_size = kwargs.get('max_page_size', 4 * 1024 * 1024)
config.max_page_size = kwargs.get("max_page_size", 4 * 1024 * 1024)

# Blob downloads
config.max_single_get_size = kwargs.get('max_single_get_size', 32 * 1024 * 1024)
config.max_chunk_get_size = kwargs.get('max_chunk_get_size', 4 * 1024 * 1024)
config.max_single_get_size = kwargs.get("max_single_get_size", 32 * 1024 * 1024)
config.max_chunk_get_size = kwargs.get("max_chunk_get_size", 4 * 1024 * 1024)

# File uploads
config.max_range_size = kwargs.get('max_range_size', 4 * 1024 * 1024)
config.max_range_size = kwargs.get("max_range_size", 4 * 1024 * 1024)
return config


Expand All @@ -280,9 +274,9 @@ def parse_query(query_str):
sas_params = ["{}={}".format(k, quote(v)) for k, v in parsed_query.items() if k in sas_values]
sas_token = None
if sas_params:
sas_token = '&'.join(sas_params)
sas_token = "&".join(sas_params)

snapshot = parsed_query.get('snapshot') or parsed_query.get('sharesnapshot')
snapshot = parsed_query.get("snapshot") or parsed_query.get("sharesnapshot")
return snapshot, sas_token


Expand All @@ -291,7 +285,7 @@ def is_credential_sastoken(credential):
return False

sas_values = QueryStringConstants.to_list()
parsed_query = parse_qs(credential.lstrip('?'))
parsed_query = parse_qs(credential.lstrip("?"))
if parsed_query and all([k in sas_values for k in parsed_query.keys()]):
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from io import BytesIO

from azure.core.exceptions import HttpResponseError
from azure.core.tracing.context import tracing_context

from .models import ModifiedAccessConditions
from .request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -452,7 +453,7 @@ def download_to_stream(self, stream, max_connections=1):
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
list(executor.map(tracing_context.with_current_context(downloader.process_chunk), downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import six

from azure.core.tracing.context import tracing_context
from .models import ModifiedAccessConditions
from . import encode_base64, url_quote
from .request_handlers import get_length
Expand All @@ -38,7 +39,7 @@ def upload_file_chunks(file_service, file_size, block_size, stream, max_connecti
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
range_ids = list(executor.map(uploader.process_chunk, uploader.get_chunk_offsets()))
range_ids = list(executor.map(tracing_context.with_current_context(uploader.process_chunk), uploader.get_chunk_offsets()))
else:
if file_size is not None:
range_ids = [uploader.process_chunk(start) for start in uploader.get_chunk_offsets()]
Expand Down Expand Up @@ -101,7 +102,7 @@ def upload_blob_chunks(blob_service, blob_size, block_size, stream, max_connecti
running_futures.remove(f)

chunk_throttler.acquire()
future = executor.submit(uploader.process_chunk, chunk)
future = executor.submit(tracing_context.with_current_context(uploader.process_chunk), chunk)

# Calls callback upon completion (even if the callback was added after the Future task is done).
future.add_done_callback(lambda x: chunk_throttler.release())
Expand Down Expand Up @@ -146,7 +147,7 @@ def upload_blob_substream_blocks(blob_service, blob_size, block_size, stream, ma
if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
range_ids = list(executor.map(uploader.process_substream_block, uploader.get_substream_blocks()))
range_ids = list(executor.map(tracing_context.with_current_context(uploader.process_substream_block), uploader.get_substream_blocks()))
else:
range_ids = [uploader.process_substream_block(result) for result in uploader.get_substream_blocks()]

Expand Down
Loading