Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned Append on Identity Transform #555

Merged
merged 20 commits into from
Apr 5, 2024

Conversation

jqin61
Copy link
Contributor

@jqin61 jqin61 commented Mar 28, 2024

As discussed in the monthly meeting, this is the first PR to break #353 down into 4 PRs:
1. Partitioned append with identity transform

other three:
2. Dynamic overwrite using delete + append, 2 snapshots in one commit
3. Hidden partitioning support (for slicing the arrow table, manifest file entry.partition, data file path)
4. Static overwrite using delete + append, 2 snapshots in one commit

@jqin61 jqin61 changed the title partitioned append on identity transform Partitioned Append on Identity Transform Mar 28, 2024
table_metadata=table_metadata,
tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), # type: ignore
)
if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the old line was not checking whether the table is partitioned but was checking partition evolution?
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great find!

Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jqin61 - this PR is looking 🔥 🔥 it is super exciting to see this PR up and in such a great state already. I've left a few suggestions, please let me know if you want to discuss any of the suggested ideas in more detail

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
target_file_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
if target_file_size is None:
raise ValueError(
"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
Copy link
Collaborator

@sungwy sungwy Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this exception check, because we are setting the default value of target_file_size as TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT right in the previous line. I feel as though this is too redundant.

I understand why we are doing it though:

PropertyUtil.property_as_int returns Optional[int], and bin_packing expects an int, so we need to type check it.

If we run into more of these type checking redundancies in the code base, where when we are using property values that are always expected to have a none-null default value, maybe we should refactor PropertyUtil instead. Maybe we can have two methods, property_as_int that returns an Optional[int], and property_as_int_with_default, that returns an int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

property_as_int_with_default sounds better to me because all the exceptions raised due to missing default property could be centralized in the function? How do you feel about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that as well, the ValueError is misleading and it is not directly obvious why we would raise it.

Copy link
Contributor Author

@jqin61 jqin61 Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just find the default value itself could be None:
PARQUET_COMPRESSION_LEVEL_DEFAULT = None
so this None checking is not unnecessary?

the original code for this target_file_size check just type: ignores it

table_metadata=table_metadata,
tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), # type: ignore
)
if any(len(spec.fields) > 0 for spec in table_metadata.partition_specs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great find!

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
"""
import pyarrow as pa

partition_columns = get_partition_columns(iceberg_table_metadata, arrow_table)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about this suggestion? Most of this function's responsibility seems to lie in making sure that the partition field is provided in the arrow_table, but we seem to already be checking the schema in the write functions now.

Suggested change
partition_columns = get_partition_columns(iceberg_table_metadata, arrow_table)
partition_columns = [iceberg_table_metadata.schema().find_column_name(partition_field.source_id) for partition_field in iceberg_table_metadata.spec().fields]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be more useful when there are hidden partition columns. And the check is also for mypy check because find_column_name returns optional[str]

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
Copy link
Contributor Author

@jqin61 jqin61 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syun64 Please give another round of review, thank you!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some small comments, apart from that it looks good to me 👍

@@ -289,10 +286,7 @@ def partition_field_to_data_file_partition_field(partition_field_type: IcebergTy


@partition_field_to_data_file_partition_field.register(LongType)
@partition_field_to_data_file_partition_field.register(DateType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This single-dispatch is there only for the TimeType it seems. Probably we should we should also convert those into a native type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in the commit 82dd3ad

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful, thanks 👍

@@ -1131,8 +1133,11 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")

if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")
supported = {IdentityTransform}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
supported = {IdentityTransform}
supported_transforms = {IdentityTransform}


# Later to be extended with partition information
def generate_data_file_partition_path(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This function looks redundant. The check is being done in generate_data_file_path() as well. I would merge those two.

target_file_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
if target_file_size is None:
raise ValueError(
"Fail to get neither TableProperties.WRITE_TARGET_FILE_SIZE_BYTES nor WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT for writing target data file."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that as well, the ValueError is misleading and it is not directly obvious why we would raise it.

return table_partitions


def partition(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> Iterable[TablePartition]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a bit more length filenames. I also think we should hide this from the outside user.

Suggested change
def partition(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> Iterable[TablePartition]:
def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[TablePartition]:

I think we can also return a list, so folks know that it is already materialized.

schema=table_metadata.schema(),
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very nice!

pyiceberg/typedef.py Show resolved Hide resolved
@@ -2000,7 +2000,11 @@ def spark() -> "SparkSession":
'float': [0.0, None, 0.9],
'double': [0.0, None, 0.9],
'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
'timestamptz': [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

Comment on lines 2056 to 2058
import pyarrow as pa

"""PyArrow table with all kinds of columns."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import pyarrow as pa
"""PyArrow table with all kinds of columns."""
"""PyArrow table with all kinds of columns."""
import pyarrow as pa

Comment on lines 2064 to 2066
import pyarrow as pa

"""PyArrow table with all kinds of columns."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import pyarrow as pa
"""PyArrow table with all kinds of columns."""
"""PyArrow table with all kinds of columns."""
import pyarrow as pa

Copy link
Contributor Author

@jqin61 jqin61 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing! @Fokko I applied your suggestions and ready for another round of review. Thank you!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great @jqin61 Thanks again for working on this 👍



def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
def data_file_with_partition(partition_type: StructType, format_version: Literal[1, 2]) -> StructType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
def data_file_with_partition(partition_type: StructType, format_version: Literal[1, 2]) -> StructType:
def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:

@@ -289,10 +286,7 @@ def partition_field_to_data_file_partition_field(partition_field_type: IcebergTy


@partition_field_to_data_file_partition_field.register(LongType)
@partition_field_to_data_file_partition_field.register(DateType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful, thanks 👍

@Fokko Fokko merged commit 4148edb into apache:main Apr 5, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants