Skip to content

Commit

Permalink
Allow setting write.parquet.row-group-limit (#1016)
Browse files Browse the repository at this point in the history
* Allow setting `write.parquet.row-group-limit`

And update the docs

* Add test

* Make ruff happy

---------

Co-authored-by: Sung Yun <[email protected]>
  • Loading branch information
Fokko and sungwy authored Aug 8, 2024
1 parent 50077af commit debda66
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
1 change: 1 addition & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Iceberg tables support table properties to configure table behavior.
| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,8 +2197,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)

def write_parquet(task: WriteTask) -> DataFile:
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB

PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit"
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB
PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576

PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB
Expand Down
35 changes: 35 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
BinaryType,
BooleanType,
IntegerType,
LongType,
NestedField,
StringType,
TimestampType,
Expand Down Expand Up @@ -670,6 +671,40 @@ def another_task() -> None:


@pytest.mark.integration
def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
from pyiceberg.table import TableProperties

table_name = "default.test_small_row_groups"
try:
session_catalog.drop_table(table_name)
except NoSuchTableError:
pass # Just to make sure that the table doesn't exist

tbl = session_catalog.create_table(
table_name,
Schema(
NestedField(1, "number", LongType()),
),
properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
)

# Write 10 row groups, that should end up as 10 batches
entries = 10
tbl.append(
pa.Table.from_pylist(
[
{
"number": number,
}
for number in range(entries)
],
)
)

batches = list(tbl.scan().to_arrow_batch_reader())
assert len(batches) == entries


@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_default_to_large_types"
Expand Down

0 comments on commit debda66

Please sign in to comment.