Skip to content

Commit c81e0a5

Browse files
[STG78] Added Quick Query Parquet Dialect (#18904)
* Added Quick Query Dialect * fixed ci * fix ci path * anna comments * removed parquet type * reverted versions * reverted versions * fixed dep
1 parent d668202 commit c81e0a5

File tree

11 files changed

+145
-23
lines changed

11 files changed

+145
-23
lines changed

sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
BlobQueryError,
5555
DelimitedJsonDialect,
5656
DelimitedTextDialect,
57+
QuickQueryDialect,
5758
ArrowDialect,
5859
ArrowType,
5960
ObjectReplicationPolicy,
@@ -210,6 +211,7 @@ def download_blob_from_url(
210211
'BlobBlock',
211212
'PageRange',
212213
'AccessPolicy',
214+
'QuickQueryDialect',
213215
'ContainerSasPermissions',
214216
'BlobSasPermissions',
215217
'ResourceTypes',

sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
upload_block_blob,
5656
upload_append_blob,
5757
upload_page_blob, _any_conditions)
58-
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError
58+
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError, QuickQueryDialect, \
59+
DelimitedJsonDialect, DelimitedTextDialect
5960
from ._download import StorageStreamDownloader
6061
from ._lease import BlobLeaseClient
6162

@@ -832,16 +833,28 @@ def _quick_query_options(self, query_expression,
832833
# type: (str, **Any) -> Dict[str, Any]
833834
delimiter = '\n'
834835
input_format = kwargs.pop('blob_format', None)
835-
if input_format:
836+
if input_format == QuickQueryDialect.DelimitedJsonDialect:
837+
input_format = DelimitedJsonDialect()
838+
if input_format == QuickQueryDialect.DelimitedTextDialect:
839+
input_format = DelimitedTextDialect()
840+
input_parquet_format = input_format == "ParquetDialect"
841+
if input_format and not input_parquet_format:
836842
try:
837843
delimiter = input_format.lineterminator
838844
except AttributeError:
839845
try:
840846
delimiter = input_format.delimiter
841847
except AttributeError:
842-
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or DelimitedJsonDialect")
848+
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or "
849+
"DelimitedJsonDialect or ParquetDialect")
843850
output_format = kwargs.pop('output_format', None)
851+
if output_format == QuickQueryDialect.DelimitedJsonDialect:
852+
output_format = DelimitedJsonDialect()
853+
if output_format == QuickQueryDialect.DelimitedTextDialect:
854+
output_format = DelimitedTextDialect()
844855
if output_format:
856+
if output_format == "ParquetDialect":
857+
raise ValueError("ParquetDialect is invalid as an output format.")
845858
try:
846859
delimiter = output_format.lineterminator
847860
except AttributeError:
@@ -850,7 +863,7 @@ def _quick_query_options(self, query_expression,
850863
except AttributeError:
851864
pass
852865
else:
853-
output_format = input_format
866+
output_format = input_format if not input_parquet_format else None
854867
query_request = QueryRequest(
855868
expression=query_expression,
856869
input_serialization=serialize_query_format(input_format),
@@ -894,14 +907,18 @@ def query_blob(self, query_expression, **kwargs):
894907
:keyword blob_format:
895908
Optional. Defines the serialization of the data currently stored in the blob. The default is to
896909
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
897-
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
910+
a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum).
911+
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
898912
:paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
913+
or ~azure.storage.blob.QuickQueryDialect or str
899914
:keyword output_format:
900915
Optional. Defines the output serialization for the data stream. By default the data will be returned
901-
as it is represented in the blob. By providing an output format, the blob data will be reformatted
902-
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
903-
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect, ~azure.storage.blob.DelimitedJsonDialect
904-
or list[~azure.storage.blob.ArrowDialect]
916+
as it is represented in the blob (Parquet formats default to DelimitedTextDialect).
917+
By providing an output format, the blob data will be reformatted according to that profile.
918+
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
919+
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
920+
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
921+
or list[~azure.storage.blob.ArrowDialect] or ~azure.storage.blob.QuickQueryDialect or str
905922
:keyword lease:
906923
Required if the blob has an active lease. Value can be a BlobLeaseClient object
907924
or the lease ID as a string.

sdk/storage/azure-storage-blob/azure/storage/blob/_models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ class PremiumPageBlobTier(str, Enum):
6767
P60 = 'P60' #: P60 Tier
6868

6969

70+
class QuickQueryDialect(str, Enum):
71+
"""Specifies the quick query input/output dialect."""
72+
73+
DelimitedTextDialect = 'DelimitedTextDialect'
74+
DelimitedJsonDialect = 'DelimitedJsonDialect'
75+
ParquetDialect = 'ParquetDialect'
76+
77+
7078
class SequenceNumberAction(str, Enum):
7179
"""Sequence number actions."""
7280

sdk/storage/azure-storage-blob/azure/storage/blob/_serialize.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,12 @@ def serialize_blob_tags(tags=None):
163163

164164

165165
def serialize_query_format(formater):
166-
if isinstance(formater, DelimitedJsonDialect):
166+
if formater == "ParquetDialect":
167+
qq_format = QueryFormat(
168+
type=QueryFormatType.PARQUET,
169+
parquet_text_configuration=' '
170+
)
171+
elif isinstance(formater, DelimitedJsonDialect):
167172
serialization_settings = JsonTextConfiguration(
168173
record_separator=formater.delimiter
169174
)
@@ -196,5 +201,5 @@ def serialize_query_format(formater):
196201
elif not formater:
197202
return None
198203
else:
199-
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect.")
204+
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.")
200205
return QuerySerialization(format=qq_format)

sdk/storage/azure-storage-blob/tests/test_quick_query.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88
import base64
9+
import os
910

1011
import pytest
1112

@@ -15,11 +16,10 @@
1516
BlobServiceClient,
1617
DelimitedTextDialect,
1718
DelimitedJsonDialect,
18-
BlobQueryError
1919
)
2020

2121
# ------------------------------------------------------------------------------
22-
from azure.storage.blob._models import ArrowDialect, ArrowType
22+
from azure.storage.blob._models import ArrowDialect, ArrowType, QuickQueryDialect
2323

2424
CSV_DATA = b'Service,Package,Version,RepoPath,MissingDocs\r\nApp Configuration,' \
2525
b'azure-data-appconfiguration,1,appconfiguration,FALSE\r\nEvent Hubs' \
@@ -937,4 +937,44 @@ def on_error(error):
937937
on_error=on_error,
938938
blob_format=input_format)
939939

940+
@GlobalStorageAccountPreparer()
941+
def test_quick_query_input_in_parquet_format(self, resource_group, location, storage_account, storage_account_key):
942+
# Arrange
943+
bsc = BlobServiceClient(
944+
self.account_url(storage_account, "blob"),
945+
credential=storage_account_key)
946+
self._setup(bsc)
947+
expression = "select * from blobstorage where id < 1;"
948+
expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n"
949+
950+
blob_name = self._get_blob_reference()
951+
blob_client = bsc.get_blob_client(self.container_name, blob_name)
952+
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
953+
with open(parquet_path, "rb") as parquet_data:
954+
blob_client.upload_blob(parquet_data, overwrite=True)
955+
956+
reader = blob_client.query_blob(expression, blob_format=QuickQueryDialect.ParquetDialect)
957+
real_data = reader.readall()
958+
959+
self.assertEqual(real_data, expected_data)
960+
961+
@GlobalStorageAccountPreparer()
962+
def test_quick_query_output_in_parquet_format(self, resource_group, location, storage_account, storage_account_key):
963+
# Arrange
964+
bsc = BlobServiceClient(
965+
self.account_url(storage_account, "blob"),
966+
credential=storage_account_key)
967+
self._setup(bsc)
968+
expression = "SELECT * from BlobStorage"
969+
970+
blob_name = self._get_blob_reference()
971+
blob_client = bsc.get_blob_client(self.container_name, blob_name)
972+
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
973+
with open(parquet_path, "rb") as parquet_data:
974+
blob_client.upload_blob(parquet_data, overwrite=True)
975+
976+
with self.assertRaises(ValueError):
977+
blob_client.query_blob(
978+
expression, blob_format="ParquetDialect", output_format="ParquetDialect")
979+
940980
# ------------------------------------------------------------------------------

sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
DelimitedJsonDialect,
3232
ArrowDialect,
3333
ArrowType,
34+
QuickQueryDialect,
3435
DataLakeFileQueryError,
3536
AccessControlChangeResult,
3637
AccessControlChangeCounters,
@@ -93,6 +94,7 @@
9394
'DataLakeFileQueryError',
9495
'ArrowDialect',
9596
'ArrowType',
97+
'QuickQueryDialect',
9698
'DataLakeFileQueryError',
9799
'AnalyticsLogging',
98100
'Metrics',

sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_data_lake_file_client.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -724,16 +724,20 @@ def query_file(self, query_expression, **kwargs):
724724
:keyword file_format:
725725
Optional. Defines the serialization of the data currently stored in the file. The default is to
726726
treat the file data as CSV data formatted in the default dialect. This can be overridden with
727-
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
727+
a custom DelimitedTextDialect, or DelimitedJsonDialect or "ParquetDialect" (passed as a string or enum).
728+
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string.
728729
:paramtype file_format:
729-
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect
730+
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect or
731+
~azure.storage.filedatalake.QuickQueryDialect or str
730732
:keyword output_format:
731733
Optional. Defines the output serialization for the data stream. By default the data will be returned
732-
as it is represented in the file. By providing an output format, the file data will be reformatted
733-
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
734+
as it is represented in the file. By providing an output format,
735+
the file data will be reformatted according to that profile.
736+
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
737+
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string.
734738
:paramtype output_format:
735-
~azure.storage.filedatalake.DelimitedTextDialect, ~azure.storage.filedatalake.DelimitedJsonDialect
736-
or list[~azure.storage.filedatalake.ArrowDialect]
739+
~azure.storage.filedatalake.DelimitedTextDialect or ~azure.storage.filedatalake.DelimitedJsonDialect
740+
or list[~azure.storage.filedatalake.ArrowDialect] or ~azure.storage.filedatalake.QuickQueryDialect or str
737741
:keyword lease:
738742
Required if the file has an active lease. Value can be a DataLakeLeaseClient object
739743
or the lease ID as a string.

sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,14 @@ class ArrowDialect(BlobArrowDialect):
721721
"""
722722

723723

724+
class QuickQueryDialect(str, Enum):
725+
"""Specifies the quick query input/output dialect."""
726+
727+
DelimitedTextDialect = 'DelimitedTextDialect'
728+
DelimitedJsonDialect = 'DelimitedJsonDialect'
729+
ParquetDialect = 'ParquetDialect'
730+
731+
724732
class ArrowType(str, Enum):
725733

726734
INT64 = "int64"

sdk/storage/azure-storage-file-datalake/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
install_requires=[
9696
"azure-core<2.0.0,>=1.10.0",
9797
"msrest>=0.6.21",
98-
"azure-storage-blob<13.0.0,>=12.8.0b1"
98+
"azure-storage-blob<13.0.0,>=12.9.0b1"
9999
],
100100
extras_require={
101101
":python_version<'3.0'": ['futures', 'azure-storage-nspkg<4.0.0,>=3.0.0'],

sdk/storage/azure-storage-file-datalake/tests/test_quick_query.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
# license information.
77
# --------------------------------------------------------------------------
88
import base64
9+
import os
910

1011
import pytest
1112

1213
from azure.storage.filedatalake import (
1314
DelimitedTextDialect,
1415
DelimitedJsonDialect,
15-
DataLakeFileQueryError,
16-
ArrowDialect, ArrowType)
16+
ArrowDialect, ArrowType, QuickQueryDialect)
1717

1818
from testcase import (
1919
StorageTestCase,
@@ -866,4 +866,40 @@ def on_error(error):
866866
on_error=on_error,
867867
file_format=input_format)
868868

869+
@DataLakePreparer()
870+
def test_quick_query_input_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key):
871+
# Arrange
872+
self._setUp(datalake_storage_account_name, datalake_storage_account_key)
873+
file_name = self._get_file_reference()
874+
file_client = self.dsc.get_file_client(self.filesystem_name, file_name)
875+
876+
expression = "select * from blobstorage where id < 1;"
877+
expected_data = b"0,mdifjt55.ea3,mdifjt55.ea3\n"
878+
879+
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
880+
with open(parquet_path, "rb") as parquet_data:
881+
file_client.upload_data(parquet_data, overwrite=True)
882+
883+
reader = file_client.query_file(expression, file_format=QuickQueryDialect.ParquetDialect)
884+
real_data = reader.readall()
885+
886+
self.assertEqual(real_data, expected_data)
887+
888+
@DataLakePreparer()
889+
def test_quick_query_output_in_parquet_format(self, datalake_storage_account_name, datalake_storage_account_key):
890+
# Arrange
891+
self._setUp(datalake_storage_account_name, datalake_storage_account_key)
892+
file_name = self._get_file_reference()
893+
file_client = self.dsc.get_file_client(self.filesystem_name, file_name)
894+
895+
expression = "SELECT * from BlobStorage"
896+
parquet_path = os.path.abspath(os.path.join(os.path.abspath(__file__), "..", "./resources/parquet.parquet"))
897+
with open(parquet_path, "rb") as parquet_data:
898+
file_client.upload_data(parquet_data, overwrite=True)
899+
900+
with self.assertRaises(ValueError):
901+
file_client.query_file(
902+
expression, file_format=QuickQueryDialect.ParquetDialect,
903+
output_format=QuickQueryDialect.ParquetDialect)
904+
869905
# ------------------------------------------------------------------------------

0 commit comments

Comments
 (0)