Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,9 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
self._properties["allow_truncated_timestamps"]
),
writer_engine_version="V2",
use_compliant_nested_type=(
self._properties["use_compliant_nested_type"]
)
)

cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
Expand All @@ -1232,6 +1235,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
allow_truncated_timestamps=False,
use_compliant_nested_type=False,
)
self._set_properties()
self._set_arrow_properties()
Expand Down
5 changes: 4 additions & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* allow_truncated_timestamps()
Builder* disallow_truncated_timestamps()
Builder* store_schema()
Builder* enable_compliant_nested_types()
Builder* disable_compliant_nested_types()
Builder* set_engine_version(ArrowWriterEngineVersion version)
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()
Expand Down Expand Up @@ -501,7 +503,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
use_deprecated_int96_timestamps=*,
coerce_timestamps=*,
allow_truncated_timestamps=*,
writer_engine_version=*) except *
writer_engine_version=*,
use_compliant_nested_type=*) except *

cdef class ParquetSchema(_Weakrefable):
cdef:
Expand Down
17 changes: 14 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
allow_truncated_timestamps=False,
writer_engine_version=None) except *:
writer_engine_version=None,
use_compliant_nested_type=False) except *:
"""Arrow writer properties"""
cdef:
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1299,6 +1300,13 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
else:
arrow_props.disallow_truncated_timestamps()

# use_compliant_nested_type

if use_compliant_nested_type:
arrow_props.enable_compliant_nested_types()
else:
arrow_props.disable_compliant_nested_types()

# writer_engine_version

if writer_engine_version == "V1":
Expand Down Expand Up @@ -1328,6 +1336,7 @@ cdef class ParquetWriter(_Weakrefable):
object compression
object compression_level
object data_page_version
object use_compliant_nested_type
object version
object write_statistics
object writer_engine_version
Expand All @@ -1345,7 +1354,8 @@ cdef class ParquetWriter(_Weakrefable):
compression_level=None,
use_byte_stream_split=False,
writer_engine_version=None,
data_page_version=None):
data_page_version=None,
use_compliant_nested_type=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1377,7 +1387,8 @@ cdef class ParquetWriter(_Weakrefable):
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
coerce_timestamps=coerce_timestamps,
allow_truncated_timestamps=allow_truncated_timestamps,
writer_engine_version=writer_engine_version
writer_engine_version=writer_engine_version,
use_compliant_nested_type=use_compliant_nested_type
)

pool = maybe_unbox_memory_pool(memory_pool)
Expand Down
29 changes: 29 additions & 0 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,31 @@ def _sanitize_table(table, new_schema, flavor):
The serialized Parquet data page format version to write, defaults to
1.0. This does not impact the file schema logical types and Arrow to
Parquet type casting behavior; for that use the "version" option.
use_compliant_nested_type: bool, default False
Whether to write compliant Parquet nested type (lists) as defined
`here <https://github.com/apache/parquet-format/blob/master/
LogicalTypes.md#nested-types>`_, defaults to ``False``.
For ``use_compliant_nested_type=True``, this will write into a list
with 3-level structure where the middle level, named ``list``,
is a repeated group with a single field named ``element``
::
<list-repetition> group <name> (LIST) {
repeated group list {
<element-repetition> <element-type> element;
}
}

For ``use_compliant_nested_type=False``, this will also write into a list
with 3-level structure, where the name of the single field of the middle
level ``list`` is taken from the element name for nested columns in Arrow,
which defaults to ``item``
::
<list-repetition> group <name> (LIST) {
repeated group list {
<element-repetition> <element-type> item;
}
}

"""


Expand Down Expand Up @@ -572,6 +597,7 @@ def __init__(self, where, schema, filesystem=None,
use_byte_stream_split=False,
writer_engine_version=None,
data_page_version='1.0',
use_compliant_nested_type=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -622,6 +648,7 @@ def __init__(self, where, schema, filesystem=None,
use_byte_stream_split=use_byte_stream_split,
writer_engine_version=engine_version,
data_page_version=data_page_version,
use_compliant_nested_type=use_compliant_nested_type,
**options)
self.is_open = True

Expand Down Expand Up @@ -1775,6 +1802,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
compression_level=None,
use_byte_stream_split=False,
data_page_version='1.0',
use_compliant_nested_type=False,
**kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
Expand All @@ -1794,6 +1822,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
data_page_version=data_page_version,
use_compliant_nested_type=use_compliant_nested_type,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
113 changes: 113 additions & 0 deletions python/pyarrow/tests/parquet/test_compliant_nested_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest

import pyarrow as pa
from pyarrow.tests.parquet.common import parametrize_legacy_dataset

try:
import pyarrow.parquet as pq
from pyarrow.tests.parquet.common import (_read_table,
_check_roundtrip)
except ImportError:
pq = None

try:
import pandas as pd
import pandas.testing as tm

from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe
except ImportError:
pd = tm = None

# Tests for ARROW-11497
_test_data_simple = [
{'items': [1, 2]},
{'items': [0]},
]

_test_data_complex = [
{'items': [{'name': 'elem1', 'value': '1'},
{'name': 'elem2', 'value': '2'}]},
{'items': [{'name': 'elem1', 'value': '0'}]},
]

parametrize_test_data = pytest.mark.parametrize(
"test_data", [_test_data_simple, _test_data_complex])


@pytest.mark.pandas
@parametrize_legacy_dataset
@parametrize_test_data
def test_write_compliant_nested_type_enable(tempdir,
use_legacy_dataset, test_data):
# prepare dataframe for testing
df = pd.DataFrame(data=test_data)
# verify that we can read/write pandas df with new flag
_roundtrip_pandas_dataframe(df,
write_kwargs={
'use_compliant_nested_type': True},
use_legacy_dataset=use_legacy_dataset)

# Write to a parquet file with compliant nested type
table = pa.Table.from_pandas(df, preserve_index=False)
path = str(tempdir / 'data.parquet')
with pq.ParquetWriter(path, table.schema,
use_compliant_nested_type=True,
version='2.0') as writer:
writer.write_table(table)
# Read back as a table
new_table = _read_table(path)
# Validate that "items" columns compliant to Parquet nested format
# Should be like this: list<element: struct<name: string, value: string>>
assert isinstance(new_table.schema.types[0], pa.ListType)
assert new_table.schema.types[0].value_field.name == 'element'

# Verify that the new table can be read/written correctly
_check_roundtrip(new_table,
use_legacy_dataset=use_legacy_dataset,
use_compliant_nested_type=True)


@pytest.mark.pandas
@parametrize_legacy_dataset
@parametrize_test_data
def test_write_compliant_nested_type_disable(tempdir,
use_legacy_dataset, test_data):
# prepare dataframe for testing
df = pd.DataFrame(data=test_data)
# verify that we can read/write with new flag disabled (default behaviour)
_roundtrip_pandas_dataframe(df, write_kwargs={},
use_legacy_dataset=use_legacy_dataset)

# Write to a parquet file while disabling compliant nested type
table = pa.Table.from_pandas(df, preserve_index=False)
path = str(tempdir / 'data.parquet')
with pq.ParquetWriter(path, table.schema, version='2.0') as writer:
writer.write_table(table)
new_table = _read_table(path)

# Validate that "items" columns is not compliant to Parquet nested format
# Should be like this: list<item: struct<name: string, value: string>>
assert isinstance(new_table.schema.types[0], pa.ListType)
assert new_table.schema.types[0].value_field.name == 'item'

# Verify that the new table can be read/written correctly
_check_roundtrip(new_table,
use_legacy_dataset=use_legacy_dataset,
use_compliant_nested_type=False)