From 5a7c8f9a99f1435dfc302a352d1fbe7619f9acf8 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 22 Feb 2024 18:40:10 -0800 Subject: [PATCH 01/14] bin pack write --- pyiceberg/io/pyarrow.py | 81 +++++++++++++++++++------------------ pyiceberg/table/__init__.py | 10 +++-- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7192513a2d..72ddd5e69c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1722,54 +1722,57 @@ def fill_parquet_file_metadata( def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: - task = next(tasks) - - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass - - parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) - - file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}' schema = table_metadata.schema() arrow_file_schema = schema_to_pyarrow(schema) + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) - fo = io.new_output(file_path) row_group_size = PropertyUtil.property_as_int( properties=table_metadata.properties, property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table_metadata.default_spec_id, - equality_ids=None, - key_metadata=None, - ) + data_files = [] + for task in tasks: + file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}' + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: + for batch in task.record_batches: + writer.write_batch(batch, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + ) + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_files.append(data_file) + return iter(data_files) - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) - return iter([data_file]) + +def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator + + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB + bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True) + return bin_packed ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 76aa533d7c..7d3ded2064 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2429,7 +2429,7 @@ def _add_and_move_fields( class WriteTask: write_uuid: uuid.UUID task_id: int - df: pa.Table + record_batches: List[pa.RecordBatch] sort_order_id: Optional[int] = None # Later to be extended with partition information @@ -2458,7 +2458,7 @@ def _dataframe_to_data_files( Returns: An iterable that supplies datafiles that represent the table. """ - from pyiceberg.io.pyarrow import write_file + from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: raise ValueError("Cannot write to partitioned tables") @@ -2468,7 +2468,11 @@ def _dataframe_to_data_files( # This is an iter, so we don't have to materialize everything every time # This will be more relevant when we start doing partitioned writes - yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)])) + yield from write_file( + io=io, + table_metadata=table_metadata, + tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), + ) class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): From bae32e2f04f1b9a69c77a82ee3543e965eba2e16 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 22 Feb 2024 18:55:16 -0800 Subject: [PATCH 02/14] add write target file size config --- pyiceberg/io/pyarrow.py | 21 +++++++++++++++------ pyiceberg/table/__init__.py | 7 ++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 72ddd5e69c..cdab0054e8 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1765,14 +1765,23 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT return iter(data_files) -def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]: - # bin-pack the table into 256 MB chunks +def bin_pack_arrow_table(tbl: pa.Table, table_properties: Properties) -> Iterator[List[pa.RecordBatch]]: from pyiceberg.utils.bin_packing import PackingIterator - splits = tbl.to_batches() - target_weight = 2 << 27 # 256 MB - bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True) - return bin_packed + target_file_size = PropertyUtil.property_as_int( + properties=table_properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + assert isinstance(target_file_size, int) + bin_packed_record_batches = PackingIterator( + items=tbl.to_batches(), + target_weight=target_file_size, + lookback=2, + weight_func=lambda x: x.nbytes, + largest_bin_first=True, + ) + return bin_packed_record_batches ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7d3ded2064..70e7ed86fd 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -207,6 +207,9 @@ class TableProperties: PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column" + WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes" + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB + DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default" DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)" @@ -2471,7 +2474,9 @@ def _dataframe_to_data_files( yield from write_file( io=io, table_metadata=table_metadata, - tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), + tasks=iter([ + WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, table_metadata.properties) + ]), ) From 2730a8f4751b5e2ab2c2994816a23c9f5715ede2 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 22 Feb 2024 21:09:06 -0800 Subject: [PATCH 03/14] test --- tests/conftest.py | 59 +++++++++++++++++++++++++++++++++++++++- tests/io/test_pyarrow.py | 25 ++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index a005966ea5..ca13a8c94b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,7 +30,7 @@ import socket import string import uuid -from datetime import datetime +from datetime import date, datetime from pathlib import Path from random import choice from tempfile import TemporaryDirectory @@ -1976,3 +1976,60 @@ def spark() -> SparkSession: ) return spark + + +TEST_DATA_WITH_NULL = { + 'bool': [False, None, True], + 'string': ['a', None, 'z'], + # Go over the 16 bytes to kick in truncation + 'string_long': ['a' * 22, None, 'z' * 22], + 'int': [1, None, 9], + 'long': [1, None, 9], + 'float': [0.0, None, 0.9], + 'double': [0.0, None, 0.9], + 'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + 'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + 'date': [date(2023, 1, 1), None, date(2023, 3, 1)], + # Not supported by Spark + # 'time': [time(1, 22, 0), None, time(19, 25, 0)], + # Not natively supported by Arrow + # 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes], + 'binary': [b'\01', None, b'\22'], + 'fixed': [ + uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, + None, + uuid.UUID('11111111-1111-1111-1111-111111111111').bytes, + ], +} + + +@pytest.fixture(scope="session") +def pa_schema() -> "pa.Schema": + import pyarrow as pa + + return pa.schema([ + ("bool", pa.bool_()), + ("string", pa.string()), + ("string_long", pa.string()), + ("int", pa.int32()), + ("long", pa.int64()), + ("float", pa.float32()), + ("double", pa.float64()), + ("timestamp", pa.timestamp(unit="us")), + ("timestamptz", pa.timestamp(unit="us", tz="UTC")), + ("date", pa.date32()), + # Not supported by Spark + # ("time", pa.time64("us")), + # Not natively supported by Arrow + # ("uuid", pa.fixed(16)), + ("binary", pa.large_binary()), + ("fixed", pa.binary(16)), + ]) + + +@pytest.fixture(scope="session") +def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table": + import pyarrow as pa + + """PyArrow table with all kinds of columns""" + return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2acffdfdf9..7bcd13e83a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -64,6 +64,7 @@ _ConvertToArrowSchema, _primitive_to_physical, _read_deletes, + bin_pack_arrow_table, expression_to_pyarrow, project_table, schema_to_pyarrow, @@ -71,7 +72,7 @@ from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema, make_compatible_name, visit -from pyiceberg.table import FileScanTask, Table +from pyiceberg.table import FileScanTask, Table, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.typedef import UTF8 from pyiceberg.types import ( @@ -1710,3 +1711,25 @@ def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveT stats.update_max(val) assert stats.current_max == expected_result + + +def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None: + # default packs to 1 bin since the table is small + bin_packed = bin_pack_arrow_table(arrow_table_with_null, {}) + assert len(list(bin_packed)) == 1 + + # as long as table is smaller than default target size, it should pack to 1 bin + bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10) + assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {}) + assert len(list(bin_packed)) == 1 + + # unless we override the target size to be smaller + target_file_size = arrow_table_with_null.nbytes + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}) + assert len(list(bin_packed)) == 10 + + # and will produce half the number of files if we double the target size + target_file_size = arrow_table_with_null.nbytes * 2 + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}) + assert len(list(bin_packed)) == 5 From ef64c9264cbd3b0ab269d5bcbea63e4ebf8395e4 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 23 Feb 2024 09:55:54 -0800 Subject: [PATCH 04/14] add test for multiple data files --- tests/integration/test_writes.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 82c4ace711..6add21f8d0 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -378,6 +378,33 @@ def get_current_snapshot_id(identifier: str) -> int: assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_multiple_data_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.write_multiple_arrow_data_files" + tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, []) + + def get_data_files_count(identifier: str) -> int: + return spark.sql( + f""" + SELECT * + FROM {identifier}.all_data_files + """ + ).count() + + # writes to 1 data file since the table is small + tbl.overwrite(arrow_table_with_null) + assert get_data_files_count(identifier) == 1 + + # writes to 1 data file as long as table is smaller than default target file size + bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10) + tbl.overwrite(bigger_arrow_tbl) + assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + assert get_data_files_count(identifier) == 1 + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) @pytest.mark.parametrize( From 9cb96494f1fc77f1d1fe2ca250fd1a7c704a3c84 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 23 Feb 2024 12:55:19 -0800 Subject: [PATCH 05/14] parquet writer write once --- pyiceberg/io/pyarrow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cdab0054e8..a93d288e94 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1737,8 +1737,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT fo = io.new_output(file_path) with fo.create(overwrite=True) as fos: with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: - for batch in task.record_batches: - writer.write_batch(batch, row_group_size=row_group_size) + writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size) data_file = DataFile( content=DataFileContent.DATA, From 3f284b2f086bea5891585dcc1f04fcf378c95bed Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 23 Feb 2024 15:32:25 -0800 Subject: [PATCH 06/14] parallelize write tasks --- pyiceberg/io/pyarrow.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a93d288e94..40132e878c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1731,8 +1731,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, ) - data_files = [] - for task in tasks: + + def write_parquet(task: WriteTask) -> DataFile: file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}' fo = io.new_output(file_path) with fo.create(overwrite=True) as fos: @@ -1760,7 +1760,11 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), ) - data_files.append(data_file) + return data_file + + executor = ExecutorFactory.get_or_create() + data_files = executor.map(write_parquet, tasks) + return iter(data_files) From 6462d068bb99691ccd01fe8ea9143df46761eec0 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Feb 2024 15:13:50 -0800 Subject: [PATCH 07/14] refactor --- pyiceberg/io/pyarrow.py | 8 +------- pyiceberg/table/__init__.py | 11 ++++++++--- tests/io/test_pyarrow.py | 12 ++++++------ 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 40132e878c..ce51f1b3fd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1768,15 +1768,9 @@ def write_parquet(task: WriteTask) -> DataFile: return iter(data_files) -def bin_pack_arrow_table(tbl: pa.Table, table_properties: Properties) -> Iterator[List[pa.RecordBatch]]: +def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[pa.RecordBatch]]: from pyiceberg.utils.bin_packing import PackingIterator - target_file_size = PropertyUtil.property_as_int( - properties=table_properties, - property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, - default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - ) - assert isinstance(target_file_size, int) bin_packed_record_batches = PackingIterator( items=tbl.to_batches(), target_weight=target_file_size, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 70e7ed86fd..692fb789be 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2469,14 +2469,19 @@ def _dataframe_to_data_files( counter = itertools.count(0) write_uuid = write_uuid or uuid.uuid4() + target_file_size = PropertyUtil.property_as_int( + properties=table_metadata.properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + assert isinstance(target_file_size, int) + # This is an iter, so we don't have to materialize everything every time # This will be more relevant when we start doing partitioned writes yield from write_file( io=io, table_metadata=table_metadata, - tasks=iter([ - WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, table_metadata.properties) - ]), + tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), ) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 7bcd13e83a..33c254daed 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1715,21 +1715,21 @@ def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveT def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None: # default packs to 1 bin since the table is small - bin_packed = bin_pack_arrow_table(arrow_table_with_null, {}) + bin_packed = bin_pack_arrow_table( + arrow_table_with_null, target_file_size=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + ) assert len(list(bin_packed)) == 1 # as long as table is smaller than default target size, it should pack to 1 bin bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10) assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT - bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {}) + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, target_file_size=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT) assert len(list(bin_packed)) == 1 # unless we override the target size to be smaller - target_file_size = arrow_table_with_null.nbytes - bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}) + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, target_file_size=arrow_table_with_null.nbytes) assert len(list(bin_packed)) == 10 # and will produce half the number of files if we double the target size - target_file_size = arrow_table_with_null.nbytes * 2 - bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}) + bin_packed = bin_pack_arrow_table(bigger_arrow_tbl, target_file_size=arrow_table_with_null.nbytes * 2) assert len(list(bin_packed)) == 5 From fd1efe018e084b75c69f131fb6880e44d114de8a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Feb 2024 15:45:47 -0800 Subject: [PATCH 08/14] chunk correctly using to_batches --- pyiceberg/io/pyarrow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ce51f1b3fd..78d943c240 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1771,8 +1771,11 @@ def write_parquet(task: WriteTask) -> DataFile: def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[pa.RecordBatch]]: from pyiceberg.utils.bin_packing import PackingIterator + avg_row_size_bytes = tbl.nbytes / tbl.num_rows + max_chunksize = target_file_size // avg_row_size_bytes + batches = tbl.to_batches(max_chunksize) bin_packed_record_batches = PackingIterator( - items=tbl.to_batches(), + items=batches, target_weight=target_file_size, lookback=2, weight_func=lambda x: x.nbytes, From 7ccfdb2967620f10bbfd6d6946662207f0990598 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 26 Feb 2024 22:41:34 -0800 Subject: [PATCH 09/14] change variable names --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 78d943c240..90b0e22f59 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1772,8 +1772,8 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ from pyiceberg.utils.bin_packing import PackingIterator avg_row_size_bytes = tbl.nbytes / tbl.num_rows - max_chunksize = target_file_size // avg_row_size_bytes - batches = tbl.to_batches(max_chunksize) + target_rows_per_file = target_file_size // avg_row_size_bytes + batches = tbl.to_batches(max_chunksize=target_rows_per_file) bin_packed_record_batches = PackingIterator( items=batches, target_weight=target_file_size, From 1ee3a5597e314da6d57fd47b9c4ae238bd2686fc Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 6 Mar 2024 09:04:40 -0800 Subject: [PATCH 10/14] get rid of assert --- pyiceberg/table/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 692fb789be..411318ca87 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2474,14 +2474,13 @@ def _dataframe_to_data_files( property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, ) - assert isinstance(target_file_size, int) # This is an iter, so we don't have to materialize everything every time # This will be more relevant when we start doing partitioned writes yield from write_file( io=io, table_metadata=table_metadata, - tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), + tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), # type: ignore ) From f92de1a1af2f71e7a2058acb7457b265ca14f75c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 6 Mar 2024 09:15:47 -0800 Subject: [PATCH 11/14] configure PackingIterator --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 90b0e22f59..7f446e0638 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1777,9 +1777,9 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[ bin_packed_record_batches = PackingIterator( items=batches, target_weight=target_file_size, - lookback=2, + lookback=len(batches), # ignore lookback weight_func=lambda x: x.nbytes, - largest_bin_first=True, + largest_bin_first=False, ) return bin_packed_record_batches From 0047fd8887b24695b10c3804d580019f772998b9 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 6 Mar 2024 12:50:14 -0800 Subject: [PATCH 12/14] add more tests --- tests/integration/test_writes.py | 41 +++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 6add21f8d0..51c60dcadc 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -36,8 +36,8 @@ from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.schema import Schema -from pyiceberg.table import Table, _dataframe_to_data_files from pyiceberg.typedef import Properties +from pyiceberg.table import SetPropertiesUpdate, Table, TableProperties, _dataframe_to_data_files from pyiceberg.types import ( BinaryType, BooleanType, @@ -379,31 +379,50 @@ def get_current_snapshot_id(identifier: str) -> int: @pytest.mark.integration -@pytest.mark.parametrize("format_version", [1, 2]) -def test_write_multiple_data_files( - spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int -) -> None: - identifier = "default.write_multiple_arrow_data_files" - tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, []) +def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.write_bin_pack_data_files" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) def get_data_files_count(identifier: str) -> int: return spark.sql( f""" SELECT * - FROM {identifier}.all_data_files + FROM {identifier}.files """ ).count() - # writes to 1 data file since the table is small + def set_table_properties(tbl: Table, properties: Properties) -> Table: + with tbl.transaction() as transaction: + transaction._apply((SetPropertiesUpdate(updates=properties),)) + return tbl + + # writes 1 data file since the table is smaller than default target file size + assert arrow_table_with_null.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT tbl.overwrite(arrow_table_with_null) assert get_data_files_count(identifier) == 1 - # writes to 1 data file as long as table is smaller than default target file size + # writes 1 data file as long as table is smaller than default target file size bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10) - tbl.overwrite(bigger_arrow_tbl) assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + tbl.overwrite(bigger_arrow_tbl) assert get_data_files_count(identifier) == 1 + # writes multiple data files once target file size is overridden + target_file_size = arrow_table_with_null.nbytes + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) + assert target_file_size < bigger_arrow_tbl.nbytes + tbl.overwrite(bigger_arrow_tbl) + assert get_data_files_count(identifier) == 10 + + # writes half the number of data files when target file size doubles + target_file_size = arrow_table_with_null.nbytes * 2 + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) + assert target_file_size < bigger_arrow_tbl.nbytes + tbl.overwrite(bigger_arrow_tbl) + assert get_data_files_count(identifier) == 5 + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) From c6cb8de6d65ffb10104987f03aa26d2da114267b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 8 Mar 2024 08:25:53 -0800 Subject: [PATCH 13/14] rewrite set_properties --- tests/integration/test_writes.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 51c60dcadc..e653bc868a 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -36,8 +36,8 @@ from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.schema import Schema +from pyiceberg.table import Table, TableProperties, _dataframe_to_data_files from pyiceberg.typedef import Properties -from pyiceberg.table import SetPropertiesUpdate, Table, TableProperties, _dataframe_to_data_files from pyiceberg.types import ( BinaryType, BooleanType, @@ -391,11 +391,6 @@ def get_data_files_count(identifier: str) -> int: """ ).count() - def set_table_properties(tbl: Table, properties: Properties) -> Table: - with tbl.transaction() as transaction: - transaction._apply((SetPropertiesUpdate(updates=properties),)) - return tbl - # writes 1 data file since the table is smaller than default target file size assert arrow_table_with_null.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT tbl.overwrite(arrow_table_with_null) @@ -409,7 +404,7 @@ def set_table_properties(tbl: Table, properties: Properties) -> Table: # writes multiple data files once target file size is overridden target_file_size = arrow_table_with_null.nbytes - tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}).commit_transaction() assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) assert target_file_size < bigger_arrow_tbl.nbytes tbl.overwrite(bigger_arrow_tbl) @@ -417,7 +412,7 @@ def set_table_properties(tbl: Table, properties: Properties) -> Table: # writes half the number of data files when target file size doubles target_file_size = arrow_table_with_null.nbytes * 2 - tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}).commit_transaction() assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) assert target_file_size < bigger_arrow_tbl.nbytes tbl.overwrite(bigger_arrow_tbl) From d80054d52ab63d3ebcdd0d0d156976ee62126859 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 9 Mar 2024 08:49:01 -0800 Subject: [PATCH 14/14] set int property --- tests/integration/test_writes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index e653bc868a..a3c603141b 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -404,7 +404,7 @@ def get_data_files_count(identifier: str) -> int: # writes multiple data files once target file size is overridden target_file_size = arrow_table_with_null.nbytes - tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}).commit_transaction() + tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}).commit_transaction() assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) assert target_file_size < bigger_arrow_tbl.nbytes tbl.overwrite(bigger_arrow_tbl) @@ -412,7 +412,7 @@ def get_data_files_count(identifier: str) -> int: # writes half the number of data files when target file size doubles target_file_size = arrow_table_with_null.nbytes * 2 - tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}).commit_transaction() + tbl = tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: target_file_size}).commit_transaction() assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) assert target_file_size < bigger_arrow_tbl.nbytes tbl.overwrite(bigger_arrow_tbl)