diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index e10dfb90540..ca64cea1756 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1215,6 +1215,9 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): self._properties["allow_truncated_timestamps"] ), writer_engine_version="V2", + use_compliant_nested_type=( + self._properties["use_compliant_nested_type"] + ) ) cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): @@ -1232,6 +1235,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): use_deprecated_int96_timestamps=False, coerce_timestamps=None, allow_truncated_timestamps=False, + use_compliant_nested_type=False, ) self._set_properties() self._set_arrow_properties() diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 8c0f5a9fcec..e97f32411e6 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -382,6 +382,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* allow_truncated_timestamps() Builder* disallow_truncated_timestamps() Builder* store_schema() + Builder* enable_compliant_nested_types() + Builder* disable_compliant_nested_types() Builder* set_engine_version(ArrowWriterEngineVersion version) shared_ptr[ArrowWriterProperties] build() c_bool support_deprecated_int96_timestamps() @@ -501,7 +503,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( use_deprecated_int96_timestamps=*, coerce_timestamps=*, allow_truncated_timestamps=*, - writer_engine_version=*) except * + writer_engine_version=*, + use_compliant_nested_type=*) except * cdef class ParquetSchema(_Weakrefable): cdef: diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 53bbee63165..ce16acc6890 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1265,7 +1265,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( use_deprecated_int96_timestamps=False, coerce_timestamps=None, allow_truncated_timestamps=False, - writer_engine_version=None) except *: + writer_engine_version=None, + use_compliant_nested_type=False) except *: """Arrow writer properties""" cdef: shared_ptr[ArrowWriterProperties] arrow_properties @@ -1299,6 +1300,13 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( else: arrow_props.disallow_truncated_timestamps() + # use_compliant_nested_type + + if use_compliant_nested_type: + arrow_props.enable_compliant_nested_types() + else: + arrow_props.disable_compliant_nested_types() + # writer_engine_version if writer_engine_version == "V1": @@ -1328,6 +1336,7 @@ cdef class ParquetWriter(_Weakrefable): object compression object compression_level object data_page_version + object use_compliant_nested_type object version object write_statistics object writer_engine_version @@ -1345,7 +1354,8 @@ cdef class ParquetWriter(_Weakrefable): compression_level=None, use_byte_stream_split=False, writer_engine_version=None, - data_page_version=None): + data_page_version=None, + use_compliant_nested_type=False): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1377,7 +1387,8 @@ cdef class ParquetWriter(_Weakrefable): use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, coerce_timestamps=coerce_timestamps, allow_truncated_timestamps=allow_truncated_timestamps, - writer_engine_version=writer_engine_version + writer_engine_version=writer_engine_version, + use_compliant_nested_type=use_compliant_nested_type ) pool = maybe_unbox_memory_pool(memory_pool) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index a15dc65290c..09054978fb4 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -541,6 +541,31 @@ def _sanitize_table(table, new_schema, flavor): The serialized Parquet data page format version to write, defaults to 1.0. This does not impact the file schema logical types and Arrow to Parquet type casting behavior; for that use the "version" option. +use_compliant_nested_type: bool, default False + Whether to write compliant Parquet nested type (lists) as defined + `here `_, defaults to ``False``. + For ``use_compliant_nested_type=True``, this will write into a list + with 3-level structure where the middle level, named ``list``, + is a repeated group with a single field named ``element`` + :: + group (LIST) { + repeated group list { + element; + } + } + + For ``use_compliant_nested_type=False``, this will also write into a list + with 3-level structure, where the name of the single field of the middle + level ``list`` is taken from the element name for nested columns in Arrow, + which defaults to ``item`` + :: + group (LIST) { + repeated group list { + item; + } + } + """ @@ -572,6 +597,7 @@ def __init__(self, where, schema, filesystem=None, use_byte_stream_split=False, writer_engine_version=None, data_page_version='1.0', + use_compliant_nested_type=False, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -622,6 +648,7 @@ def __init__(self, where, schema, filesystem=None, use_byte_stream_split=use_byte_stream_split, writer_engine_version=engine_version, data_page_version=data_page_version, + use_compliant_nested_type=use_compliant_nested_type, **options) self.is_open = True @@ -1775,6 +1802,7 @@ def write_table(table, where, row_group_size=None, version='1.0', compression_level=None, use_byte_stream_split=False, data_page_version='1.0', + use_compliant_nested_type=False, **kwargs): row_group_size = kwargs.pop('chunk_size', row_group_size) use_int96 = use_deprecated_int96_timestamps @@ -1794,6 +1822,7 @@ def write_table(table, where, row_group_size=None, version='1.0', compression_level=compression_level, use_byte_stream_split=use_byte_stream_split, data_page_version=data_page_version, + use_compliant_nested_type=use_compliant_nested_type, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: diff --git a/python/pyarrow/tests/parquet/test_compliant_nested_type.py b/python/pyarrow/tests/parquet/test_compliant_nested_type.py new file mode 100644 index 00000000000..804f3738f12 --- /dev/null +++ b/python/pyarrow/tests/parquet/test_compliant_nested_type.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +import pyarrow as pa +from pyarrow.tests.parquet.common import parametrize_legacy_dataset + +try: + import pyarrow.parquet as pq + from pyarrow.tests.parquet.common import (_read_table, + _check_roundtrip) +except ImportError: + pq = None + +try: + import pandas as pd + import pandas.testing as tm + + from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe +except ImportError: + pd = tm = None + +# Tests for ARROW-11497 +_test_data_simple = [ + {'items': [1, 2]}, + {'items': [0]}, +] + +_test_data_complex = [ + {'items': [{'name': 'elem1', 'value': '1'}, + {'name': 'elem2', 'value': '2'}]}, + {'items': [{'name': 'elem1', 'value': '0'}]}, +] + +parametrize_test_data = pytest.mark.parametrize( + "test_data", [_test_data_simple, _test_data_complex]) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@parametrize_test_data +def test_write_compliant_nested_type_enable(tempdir, + use_legacy_dataset, test_data): + # prepare dataframe for testing + df = pd.DataFrame(data=test_data) + # verify that we can read/write pandas df with new flag + _roundtrip_pandas_dataframe(df, + write_kwargs={ + 'use_compliant_nested_type': True}, + use_legacy_dataset=use_legacy_dataset) + + # Write to a parquet file with compliant nested type + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + with pq.ParquetWriter(path, table.schema, + use_compliant_nested_type=True, + version='2.0') as writer: + writer.write_table(table) + # Read back as a table + new_table = _read_table(path) + # Validate that "items" columns compliant to Parquet nested format + # Should be like this: list> + assert isinstance(new_table.schema.types[0], pa.ListType) + assert new_table.schema.types[0].value_field.name == 'element' + + # Verify that the new table can be read/written correctly + _check_roundtrip(new_table, + use_legacy_dataset=use_legacy_dataset, + use_compliant_nested_type=True) + + +@pytest.mark.pandas +@parametrize_legacy_dataset +@parametrize_test_data +def test_write_compliant_nested_type_disable(tempdir, + use_legacy_dataset, test_data): + # prepare dataframe for testing + df = pd.DataFrame(data=test_data) + # verify that we can read/write with new flag disabled (default behaviour) + _roundtrip_pandas_dataframe(df, write_kwargs={}, + use_legacy_dataset=use_legacy_dataset) + + # Write to a parquet file while disabling compliant nested type + table = pa.Table.from_pandas(df, preserve_index=False) + path = str(tempdir / 'data.parquet') + with pq.ParquetWriter(path, table.schema, version='2.0') as writer: + writer.write_table(table) + new_table = _read_table(path) + + # Validate that "items" columns is not compliant to Parquet nested format + # Should be like this: list> + assert isinstance(new_table.schema.types[0], pa.ListType) + assert new_table.schema.types[0].value_field.name == 'item' + + # Verify that the new table can be read/written correctly + _check_roundtrip(new_table, + use_legacy_dataset=use_legacy_dataset, + use_compliant_nested_type=False)