-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: ManifestWriter and ManifestListWriter #8012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: ManifestWriter and ManifestListWriter #8012
Conversation
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great @JonasJ-ap!
| NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), | ||
| NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), | ||
| ) | ||
| def add_extension(self, filename: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we want to add also the compression to it .zstd.parquet etc. Do we want to use a Literal here?
| def add_extension(self, filename: str) -> str: | |
| def add_extension(self, format: Literal['parquet', 'orc', 'avro']) -> str: |
|
|
||
|
|
||
| def write_manifest( | ||
| format_version: int, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| format_version: int, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int | |
| format_version: Literal[1, 2], spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int |
| elif format_version == 2: | ||
| return ManifestWriterV2(spec, schema, output_file, snapshot_id) | ||
| else: | ||
| # TODO: replace it with UnsupportedOperationException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a ValueError is reasonable.
| upper_bound=to_bytes(self._type, self._max) if self._max is not None else None, | ||
| ) | ||
|
|
||
| def update(self, value: Any) -> PartitionFieldStats: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return PartitionFieldStats? We don't use it below.
| self._min = value | ||
| self._max = value | ||
| # TODO: may need to implement a custom comparator for incompatible types | ||
| elif value < self._min: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use Python's build in min and max
| for i, field_type in enumerate(self._types): | ||
| assert isinstance(field_type, PrimitiveType), f"Expected a primitive type for the partition field, got {field_type}" | ||
| partition_key = partition_keys[i] | ||
| self._fields[i].update(conversions.partition_to_py(field_type, partition_key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the partition_to_py:
{
"name": "partition",
"type": {
"type": "record",
"name": "r102",
"fields": [{
"name": "tpep_pickup_datetime_day",
"type": ["null", {
"type": "int",
"logicalType": "date"
}],
"default": null,
"field-id": 1000
}]
},
"field-id": 102
}It looks like this is encoded as an int.
| return f"{filename}.{self.name.lower()}" | ||
|
|
||
|
|
||
| def data_file_type(partition_type: StructType) -> StructType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this one, but I see why it is necessary. For reading, we can override certain field IDs:
iceberg/python/pyiceberg/manifest.py
Lines 319 to 335 in e389e4d
| def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: | |
| """ | |
| Reads the manifests from the manifest list. | |
| Args: | |
| input_file: The input file where the stream can be read from. | |
| Returns: | |
| An iterator of ManifestFiles that are part of the list. | |
| """ | |
| with AvroFile[ManifestFile]( | |
| input_file, | |
| MANIFEST_FILE_SCHEMA, | |
| read_types={-1: ManifestFile, 508: PartitionFieldSummary}, | |
| read_enums={517: ManifestContent}, | |
| ) as reader: | |
| yield from reader |
We could do the same when writing. We can override field-id 102 when constructing the writer. WDYT?
| def summaries(self) -> List[PartitionFieldSummary]: | ||
| return [field.to_summary() for field in self._fields] | ||
|
|
||
| def update(self, partition_keys: Record) -> PartitionSummary: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More on a meta-level. Instead of this, and the class above, I would probably write a function to convert PartitionSpec's to PartitionSummaries. I think that's more Python (and for me also easier to follow, but that's super personal of course).
| StructType, | ||
| ) | ||
|
|
||
| # TODO: Double-check what's its purpose in java |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not exactly sure what you're referring to. But when writing ManifestEntries, the sequence number is set to null because when we commit, there is a commit conflict, then we can retry. But when retrying we don't want to have to rewrite the Manifest files to update the sequence number. Therefore they are left null when written the first time. This is called Sequence number inheritance: https://iceberg.apache.org/spec/#sequence-number-inheritance
| schema, | ||
| output_file, | ||
| snapshot_id, | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a style thing, but I would prefer named arguments here.
|
Based on offline discussion,this PR will continue in #8622. Thus closing this one. Thanks! |
implements ManifestWriter and ManifestListWriter, which are part of the iceberg commit phase.
Based on: #7873
This PR currently includes prototypes of both writers, which are still subject to changes and improvements. I would greatly appreciate receiving some initial review and suggestions to foster the discussion around the development of the overall commit phase. Your insights and feedback would be invaluable. Thank you in advance for your kind assistance!