diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3445b5a7b9..971ec53961 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -32,6 +32,7 @@ Iceberg tables support table properties to configure table behavior. | -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | | `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | | `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | +| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group | | `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | | `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk | | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5bbf65759c..e54a169907 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2197,8 +2197,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = property_as_int( properties=table_metadata.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, + default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, ) def write_parquet(task: WriteTask) -> DataFile: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb39933ace..626f35350a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -176,7 +176,7 @@ class TableProperties: PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit" - PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB + PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576 PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes" PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index a2d34661e9..1664289040 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -50,6 +50,7 @@ BinaryType, BooleanType, IntegerType, + LongType, NestedField, StringType, TimestampType, @@ -670,6 +671,40 @@ def another_task() -> None: @pytest.mark.integration +def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: + from pyiceberg.table import TableProperties + + table_name = "default.test_small_row_groups" + try: + session_catalog.drop_table(table_name) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + + tbl = session_catalog.create_table( + table_name, + Schema( + NestedField(1, "number", LongType()), + ), + properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"}, + ) + + # Write 10 row groups, that should end up as 10 batches + entries = 10 + tbl.append( + pa.Table.from_pylist( + [ + { + "number": number, + } + for number in range(entries) + ], + ) + ) + + batches = list(tbl.scan().to_arrow_batch_reader()) + assert len(batches) == entries + + @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_table_scan_default_to_large_types(catalog: Catalog) -> None: identifier = "default.test_table_scan_default_to_large_types"