diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py b/sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py index 9164961ea10a..3a486093cb96 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py @@ -54,6 +54,7 @@ BlobQueryError, DelimitedJsonDialect, DelimitedTextDialect, + QuickQueryDialect, ArrowDialect, ArrowType, ObjectReplicationPolicy, @@ -210,6 +211,7 @@ def download_blob_from_url( 'BlobBlock', 'PageRange', 'AccessPolicy', + 'QuickQueryDialect', 'ContainerSasPermissions', 'BlobSasPermissions', 'ResourceTypes', diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py b/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py index 6ec2fd469fbc..6dd57e7f6a38 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py @@ -55,7 +55,8 @@ upload_block_blob, upload_append_blob, upload_page_blob, _any_conditions) -from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError +from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError, QuickQueryDialect, \ + DelimitedJsonDialect, DelimitedTextDialect from ._download import StorageStreamDownloader from ._lease import BlobLeaseClient @@ -832,16 +833,28 @@ def _quick_query_options(self, query_expression, # type: (str, **Any) -> Dict[str, Any] delimiter = '\n' input_format = kwargs.pop('blob_format', None) - if input_format: + if input_format == QuickQueryDialect.DelimitedJsonDialect: + input_format = DelimitedJsonDialect() + if input_format == QuickQueryDialect.DelimitedTextDialect: + input_format = DelimitedTextDialect() + input_parquet_format = input_format == "ParquetDialect" + if input_format and not input_parquet_format: try: delimiter = input_format.lineterminator except AttributeError: try: delimiter = input_format.delimiter except AttributeError: - raise ValueError("The Type of blob_format can only be DelimitedTextDialect or DelimitedJsonDialect") + raise ValueError("The Type of blob_format can only be DelimitedTextDialect or " + "DelimitedJsonDialect or ParquetDialect") output_format = kwargs.pop('output_format', None) + if output_format == QuickQueryDialect.DelimitedJsonDialect: + output_format = DelimitedJsonDialect() + if output_format == QuickQueryDialect.DelimitedTextDialect: + output_format = DelimitedTextDialect() if output_format: + if output_format == "ParquetDialect": + raise ValueError("ParquetDialect is invalid as an output format.") try: delimiter = output_format.lineterminator except AttributeError: @@ -850,7 +863,7 @@ def _quick_query_options(self, query_expression, except AttributeError: pass else: - output_format = input_format + output_format = input_format if not input_parquet_format else None query_request = QueryRequest( expression=query_expression, input_serialization=serialize_query_format(input_format), @@ -894,14 +907,18 @@ def query_blob(self, query_expression, **kwargs): :keyword blob_format: Optional. Defines the serialization of the data currently stored in the blob. The default is to treat the blob data as CSV data formatted in the default dialect. This can be overridden with - a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect. + a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum). + These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string :paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect + or ~azure.storage.blob.QuickQueryDialect or str :keyword output_format: Optional. Defines the output serialization for the data stream. By default the data will be returned - as it is represented in the blob. By providing an output format, the blob data will be reformatted - according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect. - :paramtype output_format: ~azure.storage.blob.DelimitedTextDialect, ~azure.storage.blob.DelimitedJsonDialect - or list[~azure.storage.blob.ArrowDialect] + as it is represented in the blob (Parquet formats default to DelimitedTextDialect). + By providing an output format, the blob data will be reformatted according to that profile. + This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect. + These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string + :paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect + or list[~azure.storage.blob.ArrowDialect] or ~azure.storage.blob.QuickQueryDialect or str :keyword lease: Required if the blob has an active lease. Value can be a BlobLeaseClient object or the lease ID as a string. diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/_models.py b/sdk/storage/azure-storage-blob/azure/storage/blob/_models.py index adc61c8647d5..f52a1683c999 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/_models.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/_models.py @@ -67,6 +67,14 @@ class PremiumPageBlobTier(str, Enum): P60 = 'P60' #: P60 Tier +class QuickQueryDialect(str, Enum): + """Specifies the quick query input/output dialect.""" + + DelimitedTextDialect = 'DelimitedTextDialect' + DelimitedJsonDialect = 'DelimitedJsonDialect' + ParquetDialect = 'ParquetDialect' + + class SequenceNumberAction(str, Enum): """Sequence number actions.""" diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/_serialize.py b/sdk/storage/azure-storage-blob/azure/storage/blob/_serialize.py index 4eb3fb4ecfa0..a200118d2418 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/_serialize.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/_serialize.py @@ -163,7 +163,12 @@ def serialize_blob_tags(tags=None): def serialize_query_format(formater): - if isinstance(formater, DelimitedJsonDialect): + if formater == "ParquetDialect": + qq_format = QueryFormat( + type=QueryFormatType.PARQUET, + parquet_text_configuration=' ' + ) + elif isinstance(formater, DelimitedJsonDialect): serialization_settings = JsonTextConfiguration( record_separator=formater.delimiter ) @@ -196,5 +201,5 @@ def serialize_query_format(formater): elif not formater: return None else: - raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect.") + raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.") return QuerySerialization(format=qq_format) diff --git a/sdk/storage/azure-storage-blob/tests/test_quick_query.py b/sdk/storage/azure-storage-blob/tests/test_quick_query.py index 2df72d50240a..b7d61b929908 100644 --- a/sdk/storage/azure-storage-blob/tests/test_quick_query.py +++ b/sdk/storage/azure-storage-blob/tests/test_quick_query.py @@ -6,6 +6,7 @@ # license information. # -------------------------------------------------------------------------- import base64 +import os import pytest @@ -15,11 +16,10 @@ BlobServiceClient, DelimitedTextDialect, DelimitedJsonDialect, - BlobQueryError ) # ------------------------------------------------------------------------------ -from azure.storage.blob._models import ArrowDialect, ArrowType +from azure.storage.blob._models import ArrowDialect, ArrowType, QuickQueryDialect CSV_DATA = b'Service,Package,Version,RepoPath,MissingDocs\r\nApp Configuration,' \ b'azure-data-appconfiguration,1,appconfiguration,FALSE\r\nEvent Hubs' \ @@ -937,4 +937,44 @@ def on_error(error): on_error=on_error, blob_format=input_format) + @GlobalStorageAccountPreparer() + def test_quick_query_input_in_parquet_format(self, resource_group, location, storage_account, storage_account_key): + # Arrange + bsc = BlobServiceClient( + self.account_url(storage_account, "blob"), + credential=storage_account_key) + self._setup(bsc) + expression = "select * from blobstorage where id < 1;" + expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n" + + blob_name = self._get_blob_reference() + blob_client = bsc.get_blob_client(self.container_name, blob_name) + parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet")) + with open(parquet_path, "rb") as parquet_data: + blob_client.upload_blob(parquet_data, overwrite=True) + + reader = blob_client.query_blob(expression, blob_format=QuickQueryDialect.ParquetDialect) + real_data = reader.readall() + + self.assertEqual(real_data, expected_data) + + @GlobalStorageAccountPreparer() + def test_quick_query_output_in_parquet_format(self, resource_group, location, storage_account, storage_account_key): + # Arrange + bsc = BlobServiceClient( + self.account_url(storage_account, "blob"), + credential=storage_account_key) + self._setup(bsc) + expression = "SELECT * from BlobStorage" + + blob_name = self._get_blob_reference() + blob_client = bsc.get_blob_client(self.container_name, blob_name) + parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet")) + with open(parquet_path, "rb") as parquet_data: + blob_client.upload_blob(parquet_data, overwrite=True) + + with self.assertRaises(ValueError): + blob_client.query_blob( + expression, blob_format="ParquetDialect", output_format="ParquetDialect") + # ------------------------------------------------------------------------------ diff --git a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/__init__.py b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/__init__.py index 459c4c07b44a..99d2ef7b1826 100644 --- a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/__init__.py +++ b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/__init__.py @@ -31,6 +31,7 @@ DelimitedJsonDialect, ArrowDialect, ArrowType, + QuickQueryDialect, DataLakeFileQueryError, AccessControlChangeResult, AccessControlChangeCounters, @@ -93,6 +94,7 @@ 'DataLakeFileQueryError', 'ArrowDialect', 'ArrowType', + 'QuickQueryDialect', 'DataLakeFileQueryError', 'AnalyticsLogging', 'Metrics', diff --git a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_data_lake_file_client.py b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_data_lake_file_client.py index e15842dc3b6d..fe074b3a49db 100644 --- a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_data_lake_file_client.py +++ b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_data_lake_file_client.py @@ -724,16 +724,20 @@ def query_file(self, query_expression, **kwargs): :keyword file_format: Optional. Defines the serialization of the data currently stored in the file. The default is to treat the file data as CSV data formatted in the default dialect. This can be overridden with - a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect. + a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum). + These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string. :paramtype file_format: - ~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect + ~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect or + ~azure.storage.filedatalake.QuickQueryDialect or str :keyword output_format: Optional. Defines the output serialization for the data stream. By default the data will be returned - as it is represented in the file. By providing an output format, the file data will be reformatted - according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect. + as it is represented in the file. By providing an output format, + the file data will be reformatted according to that profile. + This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect. + These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string. :paramtype output_format: - ~azure.storage.filedatalake.DelimitedTextDialect, ~azure.storage.filedatalake.DelimitedJsonDialect - or list[~azure.storage.filedatalake.ArrowDialect] + ~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect + or list[~azure.storage.filedatalake.ArrowDialect] or ~azure.storage.filedatalake.QuickQueryDialect or str :keyword lease: Required if the file has an active lease. Value can be a DataLakeLeaseClient object or the lease ID as a string. diff --git a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_models.py b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_models.py index 24c31ad8ad4a..fd6cf88163c2 100644 --- a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_models.py +++ b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_models.py @@ -721,6 +721,14 @@ class ArrowDialect(BlobArrowDialect): """ +class QuickQueryDialect(str, Enum): + """Specifies the quick query input/output dialect.""" + + DelimitedTextDialect = 'DelimitedTextDialect' + DelimitedJsonDialect = 'DelimitedJsonDialect' + ParquetDialect = 'ParquetDialect' + + class ArrowType(str, Enum): INT64 = "int64" diff --git a/sdk/storage/azure-storage-file-datalake/setup.py b/sdk/storage/azure-storage-file-datalake/setup.py index 62b64edf60f5..7c4e0eea3688 100644 --- a/sdk/storage/azure-storage-file-datalake/setup.py +++ b/sdk/storage/azure-storage-file-datalake/setup.py @@ -95,7 +95,7 @@ install_requires=[ "azure-core<2.0.0,>=1.10.0", "msrest>=0.6.21", - "azure-storage-blob<13.0.0,>=12.8.0b1" + "azure-storage-blob<13.0.0,>=12.9.0b1" ], extras_require={ ":python_version<'3.0'": ['futures', 'azure-storage-nspkg<4.0.0,>=3.0.0'], diff --git a/sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py b/sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py index 4098867aea0a..281d20dd0e2f 100644 --- a/sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py +++ b/sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py @@ -6,14 +6,14 @@ # license information. # -------------------------------------------------------------------------- import base64 +import os import pytest from azure.storage.filedatalake import ( DelimitedTextDialect, DelimitedJsonDialect, - DataLakeFileQueryError, - ArrowDialect, ArrowType) + ArrowDialect, ArrowType, QuickQueryDialect) from testcase import ( StorageTestCase, @@ -866,4 +866,40 @@ def on_error(error): on_error=on_error, file_format=input_format) + @DataLakePreparer() + def test_quick_query_input_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key): + # Arrange + self._setUp(datalake_storage_account_name, datalake_storage_account_key) + file_name = self._get_file_reference() + file_client = self.dsc.get_file_client(self.filesystem_name, file_name) + + expression = "select * from blobstorage where id < 1;" + expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n" + + parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet")) + with open(parquet_path, "rb") as parquet_data: + file_client.upload_data(parquet_data, overwrite=True) + + reader = file_client.query_file(expression, file_format=QuickQueryDialect.ParquetDialect) + real_data = reader.readall() + + self.assertEqual(real_data, expected_data) + + @DataLakePreparer() + def test_quick_query_output_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key): + # Arrange + self._setUp(datalake_storage_account_name, datalake_storage_account_key) + file_name = self._get_file_reference() + file_client = self.dsc.get_file_client(self.filesystem_name, file_name) + + expression = "SELECT * from BlobStorage" + parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet")) + with open(parquet_path, "rb") as parquet_data: + file_client.upload_data(parquet_data, overwrite=True) + + with self.assertRaises(ValueError): + file_client.query_file( + expression, file_format=QuickQueryDialect.ParquetDialect, + output_format=QuickQueryDialect.ParquetDialect) + # ------------------------------------------------------------------------------ diff --git a/shared_requirements.txt b/shared_requirements.txt index 59899dd72930..17cd6832259b 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -157,7 +157,7 @@ pyjwt>=1.7.1 #override azure-storage-file-share azure-core<2.0.0,>=1.10.0 #override azure-storage-file-datalake azure-core<2.0.0,>=1.10.0 #override azure-storage-file-datalake msrest>=0.6.21 -#override azure-storage-file-datalake azure-storage-blob<13.0.0,>=12.8.0b1 +#override azure-storage-file-datalake azure-storage-blob<13.0.0,>=12.9.0b1 #override azure-security-attestation msrest>=0.6.21 #override azure-security-attestation azure-core<2.0.0,>=1.8.2 #override azure-data-tables msrest>=0.6.19