Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
BlobQueryError,
DelimitedJsonDialect,
DelimitedTextDialect,
QuickQueryDialect,
ArrowDialect,
ArrowType,
ObjectReplicationPolicy,
Expand Down Expand Up @@ -210,6 +211,7 @@ def download_blob_from_url(
'BlobBlock',
'PageRange',
'AccessPolicy',
'QuickQueryDialect',
'ContainerSasPermissions',
'BlobSasPermissions',
'ResourceTypes',
Expand Down
35 changes: 26 additions & 9 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
upload_block_blob,
upload_append_blob,
upload_page_blob, _any_conditions)
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError, QuickQueryDialect, \
DelimitedJsonDialect, DelimitedTextDialect
from ._download import StorageStreamDownloader
from ._lease import BlobLeaseClient

Expand Down Expand Up @@ -832,16 +833,28 @@ def _quick_query_options(self, query_expression,
# type: (str, **Any) -> Dict[str, Any]
delimiter = '\n'
input_format = kwargs.pop('blob_format', None)
if input_format:
if input_format == QuickQueryDialect.DelimitedJsonDialect:
input_format = DelimitedJsonDialect()
if input_format == QuickQueryDialect.DelimitedTextDialect:
input_format = DelimitedTextDialect()
input_parquet_format = input_format == "ParquetDialect"
if input_format and not input_parquet_format:
try:
delimiter = input_format.lineterminator
except AttributeError:
try:
delimiter = input_format.delimiter
except AttributeError:
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or DelimitedJsonDialect")
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or "
"DelimitedJsonDialect or ParquetDialect")
output_format = kwargs.pop('output_format', None)
if output_format == QuickQueryDialect.DelimitedJsonDialect:
output_format = DelimitedJsonDialect()
if output_format == QuickQueryDialect.DelimitedTextDialect:
output_format = DelimitedTextDialect()
if output_format:
if output_format == "ParquetDialect":
raise ValueError("ParquetDialect is invalid as an output format.")
try:
delimiter = output_format.lineterminator
except AttributeError:
Expand All @@ -850,7 +863,7 @@ def _quick_query_options(self, query_expression,
except AttributeError:
pass
else:
output_format = input_format
output_format = input_format if not input_parquet_format else None
query_request = QueryRequest(
expression=query_expression,
input_serialization=serialize_query_format(input_format),
Expand Down Expand Up @@ -894,14 +907,18 @@ def query_blob(self, query_expression, **kwargs):
:keyword blob_format:
Optional. Defines the serialization of the data currently stored in the blob. The default is to
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum).
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or ~azure.storage.blob.QuickQueryDialect or str
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
as it is represented in the blob. By providing an output format, the blob data will be reformatted
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect, ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect]
as it is represented in the blob (Parquet formats default to DelimitedTextDialect).
By providing an output format, the blob data will be reformatted according to that profile.
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect] or ~azure.storage.blob.QuickQueryDialect or str
:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
Expand Down
8 changes: 8 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ class PremiumPageBlobTier(str, Enum):
P60 = 'P60' #: P60 Tier


class QuickQueryDialect(str, Enum):
"""Specifies the quick query input/output dialect."""

DelimitedTextDialect = 'DelimitedTextDialect'
DelimitedJsonDialect = 'DelimitedJsonDialect'
ParquetDialect = 'ParquetDialect'


class SequenceNumberAction(str, Enum):
"""Sequence number actions."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ def serialize_blob_tags(tags=None):


def serialize_query_format(formater):
if isinstance(formater, DelimitedJsonDialect):
if formater == "ParquetDialect":
qq_format = QueryFormat(
type=QueryFormatType.PARQUET,
parquet_text_configuration=' '
)
elif isinstance(formater, DelimitedJsonDialect):
serialization_settings = JsonTextConfiguration(
record_separator=formater.delimiter
)
Expand Down Expand Up @@ -196,5 +201,5 @@ def serialize_query_format(formater):
elif not formater:
return None
else:
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect.")
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.")
return QuerySerialization(format=qq_format)
44 changes: 42 additions & 2 deletions sdk/storage/azure-storage-blob/tests/test_quick_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# license information.
# --------------------------------------------------------------------------
import base64
import os

import pytest

Expand All @@ -15,11 +16,10 @@
BlobServiceClient,
DelimitedTextDialect,
DelimitedJsonDialect,
BlobQueryError
)

# ------------------------------------------------------------------------------
from azure.storage.blob._models import ArrowDialect, ArrowType
from azure.storage.blob._models import ArrowDialect, ArrowType, QuickQueryDialect

CSV_DATA = b'Service,Package,Version,RepoPath,MissingDocs\r\nApp Configuration,' \
b'azure-data-appconfiguration,1,appconfiguration,FALSE\r\nEvent Hubs' \
Expand Down Expand Up @@ -937,4 +937,44 @@ def on_error(error):
on_error=on_error,
blob_format=input_format)

@GlobalStorageAccountPreparer()
def test_quick_query_input_in_parquet_format(self, resource_group, location, storage_account, storage_account_key):
# Arrange
bsc = BlobServiceClient(
self.account_url(storage_account, "blob"),
credential=storage_account_key)
self._setup(bsc)
expression = "select * from blobstorage where id < 1;"
expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n"

blob_name = self._get_blob_reference()
blob_client = bsc.get_blob_client(self.container_name, blob_name)
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
with open(parquet_path, "rb") as parquet_data:
blob_client.upload_blob(parquet_data, overwrite=True)

reader = blob_client.query_blob(expression, blob_format=QuickQueryDialect.ParquetDialect)
real_data = reader.readall()

self.assertEqual(real_data, expected_data)

@GlobalStorageAccountPreparer()
def test_quick_query_output_in_parquet_format(self, resource_group, location, storage_account, storage_account_key):
# Arrange
bsc = BlobServiceClient(
self.account_url(storage_account, "blob"),
credential=storage_account_key)
self._setup(bsc)
expression = "SELECT * from BlobStorage"

blob_name = self._get_blob_reference()
blob_client = bsc.get_blob_client(self.container_name, blob_name)
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
with open(parquet_path, "rb") as parquet_data:
blob_client.upload_blob(parquet_data, overwrite=True)

with self.assertRaises(ValueError):
blob_client.query_blob(
expression, blob_format="ParquetDialect", output_format="ParquetDialect")

# ------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
DelimitedJsonDialect,
ArrowDialect,
ArrowType,
QuickQueryDialect,
DataLakeFileQueryError,
AccessControlChangeResult,
AccessControlChangeCounters,
Expand Down Expand Up @@ -93,6 +94,7 @@
'DataLakeFileQueryError',
'ArrowDialect',
'ArrowType',
'QuickQueryDialect',
'DataLakeFileQueryError',
'AnalyticsLogging',
'Metrics',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,20 @@ def query_file(self, query_expression, **kwargs):
:keyword file_format:
Optional. Defines the serialization of the data currently stored in the file. The default is to
treat the file data as CSV data formatted in the default dialect. This can be overridden with
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum).
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string.
:paramtype file_format:
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect or
~azure.storage.filedatalake.QuickQueryDialect or str
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
as it is represented in the file. By providing an output format, the file data will be reformatted
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
as it is represented in the file. By providing an output format,
the file data will be reformatted according to that profile.
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string.
:paramtype output_format:
~azure.storage.filedatalake.DelimitedTextDialect, ~azure.storage.filedatalake.DelimitedJsonDialect
or list[~azure.storage.filedatalake.ArrowDialect]
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect
or list[~azure.storage.filedatalake.ArrowDialect] or ~azure.storage.filedatalake.QuickQueryDialect or str
:keyword lease:
Required if the file has an active lease. Value can be a DataLakeLeaseClient object
or the lease ID as a string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,14 @@ class ArrowDialect(BlobArrowDialect):
"""


class QuickQueryDialect(str, Enum):
"""Specifies the quick query input/output dialect."""

DelimitedTextDialect = 'DelimitedTextDialect'
DelimitedJsonDialect = 'DelimitedJsonDialect'
ParquetDialect = 'ParquetDialect'


class ArrowType(str, Enum):

INT64 = "int64"
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-file-datalake/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
install_requires=[
"azure-core<2.0.0,>=1.10.0",
"msrest>=0.6.21",
"azure-storage-blob<13.0.0,>=12.8.0b1"
"azure-storage-blob<13.0.0,>=12.9.0b1"
],
extras_require={
":python_version<'3.0'": ['futures', 'azure-storage-nspkg<4.0.0,>=3.0.0'],
Expand Down
40 changes: 38 additions & 2 deletions sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
# license information.
# --------------------------------------------------------------------------
import base64
import os

import pytest

from azure.storage.filedatalake import (
DelimitedTextDialect,
DelimitedJsonDialect,
DataLakeFileQueryError,
ArrowDialect, ArrowType)
ArrowDialect, ArrowType, QuickQueryDialect)

from testcase import (
StorageTestCase,
Expand Down Expand Up @@ -866,4 +866,40 @@ def on_error(error):
on_error=on_error,
file_format=input_format)

@DataLakePreparer()
def test_quick_query_input_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key):
# Arrange
self._setUp(datalake_storage_account_name, datalake_storage_account_key)
file_name = self._get_file_reference()
file_client = self.dsc.get_file_client(self.filesystem_name, file_name)

expression = "select * from blobstorage where id < 1;"
expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n"

parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
with open(parquet_path, "rb") as parquet_data:
file_client.upload_data(parquet_data, overwrite=True)

reader = file_client.query_file(expression, file_format=QuickQueryDialect.ParquetDialect)
real_data = reader.readall()

self.assertEqual(real_data, expected_data)

@DataLakePreparer()
def test_quick_query_output_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key):
# Arrange
self._setUp(datalake_storage_account_name, datalake_storage_account_key)
file_name = self._get_file_reference()
file_client = self.dsc.get_file_client(self.filesystem_name, file_name)

expression = "SELECT * from BlobStorage"
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
with open(parquet_path, "rb") as parquet_data:
file_client.upload_data(parquet_data, overwrite=True)

with self.assertRaises(ValueError):
file_client.query_file(
expression, file_format=QuickQueryDialect.ParquetDialect,
output_format=QuickQueryDialect.ParquetDialect)

# ------------------------------------------------------------------------------
2 changes: 1 addition & 1 deletion shared_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pyjwt>=1.7.1
#override azure-storage-file-share azure-core<2.0.0,>=1.10.0
#override azure-storage-file-datalake azure-core<2.0.0,>=1.10.0
#override azure-storage-file-datalake msrest>=0.6.21
#override azure-storage-file-datalake azure-storage-blob<13.0.0,>=12.8.0b1
#override azure-storage-file-datalake azure-storage-blob<13.0.0,>=12.9.0b1
#override azure-security-attestation msrest>=0.6.0
#override azure-security-attestation azure-core<2.0.0,>=1.8.2
#override azure-data-tables msrest>=0.6.19
Expand Down