Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys


X_MS_VERSION = '2018-03-28'
X_MS_VERSION = '2018-06-17'

# Socket timeout in seconds
DEFAULT_SOCKET_TIMEOUT = 20
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# --------------------------------------------------------------------------

from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Iterable, Dict, List, Type, Tuple,
Optional, Any, Tuple,
TYPE_CHECKING
)
import base64
Expand Down Expand Up @@ -58,11 +58,7 @@


if TYPE_CHECKING:
from datetime import datetime
from azure.core.pipeline.transport import HttpTransport
from azure.core.pipeline.policies import HTTPPolicy
from azure.core.exceptions import AzureError


_LOGGER = logging.getLogger(__name__)

Expand All @@ -88,6 +84,13 @@ class _QueryStringConstants(object):
END_RK = 'erk'
SIGNED_RESOURCE_TYPES = 'srt'
SIGNED_SERVICES = 'ss'
SIGNED_TIMESTAMP = 'snapshot'
SIGNED_OID = 'skoid'
SIGNED_TID = 'sktid'
SIGNED_KEY_START = 'skt'
SIGNED_KEY_EXPIRY = 'ske'
SIGNED_KEY_SERVICE = 'sks'
SIGNED_KEY_VERSION = 'skv'

@staticmethod
def to_list():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import six

from ._shared.shared_access_signature import BlobSharedAccessSignature
from ._shared.shared_access_signature import ResourceSharedAccessSignature
from ._shared.encryption import _generate_blob_encryption_data
from ._shared.upload_chunking import IterStreamer
from ._shared.utils import (
Expand Down Expand Up @@ -298,12 +298,12 @@ def generate_shared_access_signature(
"""
if not hasattr(self.credential, 'account_key') or not self.credential.account_key:
raise ValueError("No account SAS key available.")
sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key)
sas = ResourceSharedAccessSignature(self.credential.account_name, self.credential.account_key)
return sas.generate_blob(
self.container_name,
self.blob_name,
permission,
expiry,
permission=permission,
expiry=expiry,
start=start,
policy_id=policy_id,
ip=ip,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import six

from ._shared.shared_access_signature import BlobSharedAccessSignature
from ._shared.shared_access_signature import ResourceSharedAccessSignature
from ._shared.utils import (
StorageAccountHostsMixin,
process_storage_error,
Expand Down Expand Up @@ -263,7 +263,7 @@ def generate_shared_access_signature(
"""
if not hasattr(self.credential, 'account_key') and not self.credential.account_key:
raise ValueError("No account SAS key available.")
sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key)
sas = ResourceSharedAccessSignature(self.credential.account_name, self.credential.account_key)
return sas.generate_container(
self.container_name,
permission=permission,
Expand Down
33 changes: 33 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,3 +1013,36 @@ def __str__(self):
BlobPermissions.DELETE = BlobPermissions(delete=True)
BlobPermissions.READ = BlobPermissions(read=True)
BlobPermissions.WRITE = BlobPermissions(write=True)


class UserDelegationKey(object):
"""
Represents a user delegation key, provided to the user by Azure Storage
based on their Azure Active Directory access token.

The fields are saved as simple strings since the user does not have to interact with this object;
to generate an identify SAS, the user can simply pass it to the right API.

:ivar str signed_oid:
Object ID of this token.
:ivar str signed_tid:
Tenant ID of the tenant that issued this token.
:ivar str signed_start:
The datetime this token becomes valid.
:ivar str signed_expiry:
The datetime this token expires.
:ivar str signed_service:
What service this key is valid for.
:ivar str signed_version:
The version identifier of the REST service that created this token.
:ivar str value:
The user delegation key.
"""
def __init__(self):
self.signed_oid = None
self.signed_tid = None
self.signed_start = None
self.signed_expiry = None
self.signed_service = None
self.signed_version = None
self.value = None
31 changes: 31 additions & 0 deletions sdk/storage/azure-storage-blob/tests/test_common_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,37 @@ def test_sas_access_blob(self):
# Assert
self.assertEqual(self.byte_data, content)

def test_sas_access_blob_snapshot(self):
# SAS URL is calculated from storage key, so this test runs live only
if TestMode.need_recording_file(self.test_mode):
return

# Arrange
blob_name = self._create_block_blob()
blob_client = self.bsc.get_blob_client(self.container_name, blob_name)
blob_snapshot = blob_client.create_snapshot()
blob_snapshot_client = self.bsc.get_blob_client(self.container_name, blob_name, snapshot=blob_snapshot)

token = blob_snapshot_client.generate_shared_access_signature(
permission=BlobPermissions.READ + BlobPermissions.DELETE,
expiry=datetime.utcnow() + timedelta(hours=1),
)

service = BlobClient(blob_snapshot_client.url, credential=token)

# Act
snapshot_content = service.download_blob().content_as_bytes()

# Assert
self.assertEqual(self.byte_data, snapshot_content)

# Act
service.delete_blob()

# Assert
with self.assertRaises(ResourceNotFoundError):
service.get_blob_properties()

@record
def test_sas_signed_identifier(self):
# SAS URL is calculated from storage key, so this test runs live only
Expand Down
Loading