From 87e436d9fda380633a8083076c9523380c2a7948 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:51:03 +0000 Subject: [PATCH 1/9] upyarrow IO property for configuring large v small types on read --- pyiceberg/io/__init__.py | 1 + pyiceberg/io/pyarrow.py | 55 ++++++++++++++++++++++++++++++-- tests/io/test_pyarrow_visitor.py | 6 ++++ 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index d200874741..0567af2d5d 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -80,6 +80,7 @@ GCS_ENDPOINT = "gcs.endpoint" GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" GCS_VERSION_AWARE = "gcs.version-aware" +PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" @runtime_checkable diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f3b85eb499..13a4580866 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -95,6 +95,7 @@ HDFS_KERB_TICKET, HDFS_PORT, HDFS_USER, + PYARROW_USE_LARGE_TYPES_ON_READ, S3_ACCESS_KEY_ID, S3_CONNECT_TIMEOUT, S3_ENDPOINT, @@ -153,6 +154,7 @@ TimestamptzType, TimeType, UUIDType, + strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config @@ -836,6 +838,10 @@ def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema: return visit_pyarrow(schema, _ConvertToLargeTypes()) +def _pyarrow_schema_ensure_small_types(schema: pa.Schema) -> pa.Schema: + return visit_pyarrow(schema, _ConvertToSmallTypes()) + + @singledispatch def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T: """Apply a pyarrow schema visitor to any point within a schema. @@ -1146,6 +1152,30 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType: return primitive +class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]): + def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema: + return pa.schema(struct_result) + + def struct(self, struct: pa.StructType, field_results: List[pa.Field]) -> pa.StructType: + return pa.struct(field_results) + + def field(self, field: pa.Field, field_result: pa.DataType) -> pa.Field: + return field.with_type(field_result) + + def list(self, list_type: pa.ListType, element_result: pa.DataType) -> pa.DataType: + return pa._list(element_result) + + def map(self, map_type: pa.MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType: + return pa.map_(key_result, value_result) + + def primitive(self, primitive: pa.DataType) -> pa.DataType: + if primitive == pa.large_string(): + return pa.string() + elif primitive == pa.large_binary(): + return pa.binary() + return primitive + + class _ConvertToIcebergWithoutIDs(_ConvertToIceberg): """ Converts PyArrowSchema to Iceberg Schema with all -1 ids. @@ -1170,6 +1200,7 @@ def _task_to_record_batches( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, + use_large_types: bool = True, ) -> Iterator[pa.RecordBatch]: _, _, path = PyArrowFileIO.parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1198,7 +1229,9 @@ def _task_to_record_batches( # https://github.com/apache/arrow/issues/41884 # https://github.com/apache/arrow/issues/43183 # Would be good to remove this later on - schema=_pyarrow_schema_ensure_large_types(physical_schema), + schema=_pyarrow_schema_ensure_large_types(physical_schema) + if use_large_types + else (_pyarrow_schema_ensure_small_types(physical_schema)), # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1233,10 +1266,19 @@ def _task_to_table( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, + use_large_types: bool = True, ) -> Optional[pa.Table]: batches = list( _task_to_record_batches( - fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping + fs, + task, + bound_row_filter, + projected_schema, + projected_field_ids, + positional_deletes, + case_sensitive, + name_mapping, + use_large_types, ) ) @@ -1304,6 +1346,10 @@ def project_table( # When FsSpec is not installed raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e + use_large_types = ( + val if isinstance(val := io.properties.get(PYARROW_USE_LARGE_TYPES_ON_READ, "True"), bool) else strtobool(val) + ) + bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) projected_field_ids = { @@ -1323,6 +1369,7 @@ def project_table( deletes_per_file.get(task.file.file_path), case_sensitive, table_metadata.name_mapping(), + use_large_types, ) for task in tasks ] @@ -1395,6 +1442,9 @@ def project_batches( # When FsSpec is not installed raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e + use_large_types = ( + val if isinstance(val := io.properties.get(PYARROW_USE_LARGE_TYPES_ON_READ, "True"), bool) else strtobool(val) + ) bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) projected_field_ids = { @@ -1415,6 +1465,7 @@ def project_batches( deletes_per_file.get(task.file.file_path), case_sensitive, table_metadata.name_mapping(), + use_large_types, ) for batch in batches: if limit is not None: diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index f0a2a45816..0ac821be78 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -40,6 +40,7 @@ _HasIds, _NullNaNUnmentionedTermsCollector, _pyarrow_schema_ensure_large_types, + _pyarrow_schema_ensure_small_types, pyarrow_to_schema, schema_to_pyarrow, visit_pyarrow, @@ -596,6 +597,11 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema +def test_pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: + schema_with_large_types = _pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids) + assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids + + @pytest.fixture def bound_reference_str() -> BoundReference[Any]: return BoundReference( From 2d84784371621679f14ede61b4405e8787b0e0b6 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:50:59 +0000 Subject: [PATCH 2/9] tests --- pyiceberg/io/pyarrow.py | 10 +++--- tests/integration/test_reads.py | 59 ++++++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 13a4580866..8efa04f8e4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -883,8 +883,8 @@ def _(obj: Union[pa.ListType, pa.LargeListType, pa.FixedSizeListType], visitor: visitor.before_list_element(obj.value_field) result = visit_pyarrow(obj.value_type, visitor) visitor.after_list_element(obj.value_field) - - return visitor.list(obj, result) + ret = visitor.list(obj, result) + return ret @visit_pyarrow.register(pa.MapType) @@ -1163,7 +1163,8 @@ def field(self, field: pa.Field, field_result: pa.DataType) -> pa.Field: return field.with_type(field_result) def list(self, list_type: pa.ListType, element_result: pa.DataType) -> pa.DataType: - return pa._list(element_result) + print("DEBUG") + return pa.list_(element_result) def map(self, map_type: pa.MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType: return pa.map_(key_result, value_result) @@ -1599,12 +1600,13 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: if isinstance(list_array, (pa.ListArray, pa.LargeListArray, pa.FixedSizeListArray)) and value_array is not None: + list_initializer = pa.large_list if isinstance(list_array, pa.LargeListArray) else pa.list_ if isinstance(value_array, pa.StructArray): # This can be removed once this has been fixed: # https://github.com/apache/arrow/issues/38809 list_array = pa.LargeListArray.from_arrays(list_array.offsets, value_array) value_array = self._cast_if_needed(list_type.element_field, value_array) - arrow_field = pa.large_list(self._construct_field(list_type.element_field, value_array.type)) + arrow_field = list_initializer(self._construct_field(list_type.element_field, value_array.type)) return list_array.cast(arrow_field) else: return None diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 078abf406a..4f43944210 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -40,7 +40,12 @@ NotEqualTo, NotNaN, ) -from pyiceberg.io.pyarrow import pyarrow_to_schema +from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ +from pyiceberg.io.pyarrow import ( + _pyarrow_schema_ensure_large_types, + _pyarrow_schema_ensure_small_types, + pyarrow_to_schema, +) from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.types import ( @@ -663,3 +668,55 @@ def another_task() -> None: table.transaction().set_properties(lock="xxx").commit_transaction() assert table.properties.get("lock") == "xxx" + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_table_scan_with_large_types(catalog: Catalog) -> None: + identifier = "default.test_table_scan_with_large_types" + arrow_table = pa.Table.from_arrays( + [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], + names=["string", "binary", "list"], + ) + + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + tbl = catalog.create_table( + identifier, + schema=arrow_table.schema, + ) + + tbl.append(arrow_table) + + result_table = tbl.scan().to_arrow() + + assert result_table.schema.equals(_pyarrow_schema_ensure_large_types(arrow_table.schema)) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_table_scan_with_small_types(catalog: Catalog) -> None: + identifier = "default.test_table_scan_with_small_types" + arrow_table = pa.Table.from_arrays( + [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], + names=["string", "binary", "list"], + ) + + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + tbl = catalog.create_table( + identifier, + schema=arrow_table.schema, + ) + + tbl.append(arrow_table) + + tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = False + result_table = tbl.scan().to_arrow() + assert result_table.schema.equals(_pyarrow_schema_ensure_small_types(arrow_table.schema)) From a3eba4aef32f3d6b9d94277697441f62f1af914b Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:53:42 +0000 Subject: [PATCH 3/9] adopt feedback --- pyiceberg/io/pyarrow.py | 4 +--- tests/integration/test_reads.py | 25 +++++++++++++++++-------- tests/io/test_pyarrow_visitor.py | 4 ++-- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8efa04f8e4..6bf5e766d6 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -883,8 +883,7 @@ def _(obj: Union[pa.ListType, pa.LargeListType, pa.FixedSizeListType], visitor: visitor.before_list_element(obj.value_field) result = visit_pyarrow(obj.value_type, visitor) visitor.after_list_element(obj.value_field) - ret = visitor.list(obj, result) - return ret + return visitor.list(obj, result) @visit_pyarrow.register(pa.MapType) @@ -1163,7 +1162,6 @@ def field(self, field: pa.Field, field_result: pa.DataType) -> pa.Field: return field.with_type(field_result) def list(self, list_type: pa.ListType, element_result: pa.DataType) -> pa.DataType: - print("DEBUG") return pa.list_(element_result) def map(self, map_type: pa.MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType: diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 4f43944210..6e757f82b3 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -42,8 +42,6 @@ ) from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ from pyiceberg.io.pyarrow import ( - _pyarrow_schema_ensure_large_types, - _pyarrow_schema_ensure_small_types, pyarrow_to_schema, ) from pyiceberg.schema import Schema @@ -672,8 +670,8 @@ def another_task() -> None: @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_table_scan_with_large_types(catalog: Catalog) -> None: - identifier = "default.test_table_scan_with_large_types" +def test_table_scan_default_to_large_types(catalog: Catalog) -> None: + identifier = "default.test_table_scan_default_to_large_types" arrow_table = pa.Table.from_arrays( [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], names=["string", "binary", "list"], @@ -693,13 +691,18 @@ def test_table_scan_with_large_types(catalog: Catalog) -> None: result_table = tbl.scan().to_arrow() - assert result_table.schema.equals(_pyarrow_schema_ensure_large_types(arrow_table.schema)) + expected_schema = pa.schema([ + pa.field("string", pa.large_string()), + pa.field("binary", pa.large_binary()), + pa.field("list", pa.large_list(pa.large_string())), + ]) + assert result_table.schema.equals(expected_schema) @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_table_scan_with_small_types(catalog: Catalog) -> None: - identifier = "default.test_table_scan_with_small_types" +def test_table_scan_override_with_small_types(catalog: Catalog) -> None: + identifier = "default.test_table_scan_override_with_small_types" arrow_table = pa.Table.from_arrays( [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], names=["string", "binary", "list"], @@ -719,4 +722,10 @@ def test_table_scan_with_small_types(catalog: Catalog) -> None: tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = False result_table = tbl.scan().to_arrow() - assert result_table.schema.equals(_pyarrow_schema_ensure_small_types(arrow_table.schema)) + + expected_schema = pa.schema([ + pa.field("string", pa.string()), + pa.field("binary", pa.binary()), + pa.field("list", pa.list_(pa.string())), + ]) + assert result_table.schema.equals(expected_schema) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 0ac821be78..9e6df720c6 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -597,8 +597,8 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema -def test_pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: - schema_with_large_types = _pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids) +def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: + schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids From f082eb52989d8142815dd4740c9d512161381158 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:51:09 +0000 Subject: [PATCH 4/9] use property_as_bool --- pyiceberg/io/pyarrow.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index beee6d36bd..1a9601ab6a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -154,13 +154,12 @@ TimestamptzType, TimeType, UUIDType, - strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.deprecated import deprecated -from pyiceberg.utils.properties import get_first_property_value, property_as_int +from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -1344,9 +1343,7 @@ def project_table( # When FsSpec is not installed raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e - use_large_types = ( - val if isinstance(val := io.properties.get(PYARROW_USE_LARGE_TYPES_ON_READ, "True"), bool) else strtobool(val) - ) + use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) @@ -1440,9 +1437,8 @@ def project_batches( # When FsSpec is not installed raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e - use_large_types = ( - val if isinstance(val := io.properties.get(PYARROW_USE_LARGE_TYPES_ON_READ, "True"), bool) else strtobool(val) - ) + use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) + bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) projected_field_ids = { From cf4052f66a441fed1654a1d7c40b2e6c42dde502 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 2 Aug 2024 14:12:33 +0000 Subject: [PATCH 5/9] fix --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 6e757f82b3..ae7c6e6f55 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -720,7 +720,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: tbl.append(arrow_table) - tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = False + tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False" result_table = tbl.scan().to_arrow() expected_schema = pa.schema([ From 5172918d3757a396b99e8f64859265905c23bd93 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 2 Aug 2024 18:01:53 +0000 Subject: [PATCH 6/9] docs --- mkdocs/docs/configuration.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index ff3741656a..e6afc3e1cc 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -137,6 +137,16 @@ For the FileIO there are several configuration options available: +### PyArrow + + + +| Key | Example | Description | +| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| pyarrow.use-large-types-on-read | True | Use larger PyArrow types for [string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. | + + + ## Catalogs PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue and DynamoDB. From cd819b374656674a7857cb437c59c8531516d52f Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Fri, 2 Aug 2024 20:30:39 +0000 Subject: [PATCH 7/9] nits --- mkdocs/docs/configuration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index e6afc3e1cc..d4a8de3664 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -141,9 +141,9 @@ For the FileIO there are several configuration options available: -| Key | Example | Description | -| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| pyarrow.use-large-types-on-read | True | Use larger PyArrow types for [string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. | +| Key | Example | Description | +| ------------------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. | From 0fd31d7c68aca0a82ee61a46621cb07df4b68228 Mon Sep 17 00:00:00 2001 From: "Sung Yun (CODE SIGNING KEY)" Date: Mon, 5 Aug 2024 11:49:40 -0400 Subject: [PATCH 8/9] respect flag on promotion --- pyiceberg/io/pyarrow.py | 24 +++++++++++++++++++----- tests/integration/test_reads.py | 27 +++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1a9601ab6a..ffa67f9c80 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1250,7 +1250,9 @@ def _task_to_record_batches( arrow_table = pa.Table.from_batches([batch]) arrow_table = arrow_table.filter(pyarrow_filter) batch = arrow_table.to_batches()[0] - yield _to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True) + yield _to_requested_schema( + projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types + ) current_index += len(batch) @@ -1493,12 +1495,13 @@ def _to_requested_schema( batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, + use_large_types: bool = True, ) -> pa.RecordBatch: # We could re-use some of these visitors struct_array = visit_with_partner( requested_schema, batch, - ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), + ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids, use_large_types), ArrowAccessor(file_schema), ) return pa.RecordBatch.from_struct_array(struct_array) @@ -1508,20 +1511,31 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _file_schema: Schema _include_field_ids: bool _downcast_ns_timestamp_to_us: bool + _use_large_types: bool - def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None: + def __init__( + self, + file_schema: Schema, + downcast_ns_timestamp_to_us: bool = False, + include_field_ids: bool = False, + use_large_types: bool = True, + ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us + self._use_large_types = use_large_types def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self._file_schema.find_field(field.field_id) if field.field_type.is_primitive: if field.field_type != file_field.field_type: - return values.cast( - schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids) + target_schema = schema_to_pyarrow( + promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids ) + if not self._use_large_types: + target_schema = _pyarrow_schema_ensure_small_types(target_schema) + return values.cast(target_schema) elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): # Downcasting of nanoseconds to microseconds diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index ae7c6e6f55..67c37ed671 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -47,6 +47,7 @@ from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.types import ( + BinaryType, BooleanType, IntegerType, NestedField, @@ -673,8 +674,13 @@ def another_task() -> None: def test_table_scan_default_to_large_types(catalog: Catalog) -> None: identifier = "default.test_table_scan_default_to_large_types" arrow_table = pa.Table.from_arrays( - [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], - names=["string", "binary", "list"], + [ + pa.array(["a", "b", "c"]), + pa.array(["a", "b", "c"]), + pa.array([b"a", b"b", b"c"]), + pa.array([["a", "b"], ["c", "d"], ["e", "f"]]), + ], + names=["string", "string-to-binary", "binary", "list"], ) try: @@ -689,10 +695,14 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: tbl.append(arrow_table) + with tbl.update_schema() as update_schema: + update_schema.update_column("string-to-binary", BinaryType()) + result_table = tbl.scan().to_arrow() expected_schema = pa.schema([ pa.field("string", pa.large_string()), + pa.field("string-to-binary", pa.large_binary()), pa.field("binary", pa.large_binary()), pa.field("list", pa.large_list(pa.large_string())), ]) @@ -704,8 +714,13 @@ def test_table_scan_default_to_large_types(catalog: Catalog) -> None: def test_table_scan_override_with_small_types(catalog: Catalog) -> None: identifier = "default.test_table_scan_override_with_small_types" arrow_table = pa.Table.from_arrays( - [pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])], - names=["string", "binary", "list"], + [ + pa.array(["a", "b", "c"]), + pa.array(["a", "b", "c"]), + pa.array([b"a", b"b", b"c"]), + pa.array([["a", "b"], ["c", "d"], ["e", "f"]]), + ], + names=["string", "string-to-binary", "binary", "list"], ) try: @@ -720,11 +735,15 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: tbl.append(arrow_table) + with tbl.update_schema() as update_schema: + update_schema.update_column("string-to-binary", BinaryType()) + tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False" result_table = tbl.scan().to_arrow() expected_schema = pa.schema([ pa.field("string", pa.string()), + pa.field("string-to-binary", pa.binary()), pa.field("binary", pa.binary()), pa.field("list", pa.list_(pa.string())), ]) From ad6a6cba03e8cf61b3962f81ab05d69d55a48db6 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Tue, 6 Aug 2024 18:59:23 +0000 Subject: [PATCH 9/9] lint --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index bf65be900f..a2d34661e9 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -749,7 +749,7 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None: ]) assert result_table.schema.equals(expected_schema) - + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_empty_scan_ordered_str(catalog: Catalog) -> None: