-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: Avro write #7873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Avro write #7873
Conversation
This commit also fixes some small bugs uncovered by the new tests
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking great @maxdebayser
JonasJ-ap
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your great work!
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long wait, 0.4.0 was taking some time. This PR LGTM, thanks @maxdebayser for working on this!
| It stores the number of milliseconds from midnight, 00:00:00.000 | ||
| """ | ||
| self.write_int(int(time_object_to_micros(dt) / 1000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed. Iceberg doesn't allow writing millisecond precision timestamps.
| """A string is encoded as a long followed by that many bytes of UTF-8 encoded character data.""" | ||
| self.write_bytes(s.encode("utf-8")) | ||
|
|
||
| def write_date_int(self, d: date) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not simply write_date?
| It stores the number of days from the unix epoch, 1 January 1970 (ISO calendar). | ||
| """ | ||
| self.write_int(date_to_days(d)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method needed? I thought our internal representation was already int and not datetime.date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get rid of it, and make it part of the write tree
|
|
||
| def write_bytes_fixed(self, b: bytes) -> None: | ||
| """Writes fixed number of bytes.""" | ||
| self.write(struct.pack(f"{len(b)}s", b)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.write already accepts bytes. Why does this need to use struct.pack?
| bits_to_write = packed_bits >> (8 * index) | ||
| self.write(bytearray([bits_to_write & 0xFF])) | ||
|
|
||
| def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, if I remember correctly, we replaced these implementations with more native Python in the read path. We can probably do the same thing here for faster encoding and simpler code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, here it is:
unscaled_datum = int.from_bytes(data, byteorder="big", signed=True)
return unscaled_to_decimal(unscaled_datum, scale)Maybe there's an encoder equivalent to int.from_bytes that we can use to simplify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's much nicer indeed.
|
|
||
| def time_object_to_micros(t: time) -> int: | ||
| """Converts an datetime.time object to microseconds from midnight.""" | ||
| return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use the same logic as line 67 above. There are a couple of good things about that form:
- It stays in
intand doesn't introduce any floating point values. That avoids the cast and, more importantly, avoids any floating point math that may introduce errors. - It's easier to read and understand that it is correct.
| """ | ||
| self.write_int(time_object_to_micros(dt)) | ||
|
|
||
| def write_timestamp_millis_long(self, dt: datetime) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, no need for the millis.
| """ | ||
| self.write_int(int(datetime_to_micros(dt) / 1000)) | ||
|
|
||
| def write_timestamp_micros_long(self, dt: datetime) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no check here for datetime's zone. I think we need to validate that there is no zone. We may also need a write_timestamptz method. And can we rename this to write_timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed these methods. The encoder should only accept the physical types. I've added the check to the write tree.
| from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros | ||
|
|
||
|
|
||
| class BinaryEncoder: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decoder has a UUID method that is missing here. Might be good to add it so that we have a round-trip test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great one, I'll add it to the fastavro roundtrip test as well.
| return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond | ||
|
|
||
|
|
||
| def time_object_to_micros(t: time) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we did a poor job naming the time_to_micros function because the other similar ones are date_str_to_days so this would be time_str_to_micros.
It would be nice to fix this by adding both types to time_to_micros and adding a time_str_to_micros method.
| return {"type": "array", "element-id": self.last_list_field_id, "items": element_result} | ||
|
|
||
| def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) -> AvroType: | ||
| if isinstance(key_result, StringType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, should we just use an Iceberg map in all cases? Why fall back to an Avro map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I would read an Avro schema, that has a list of records, that's for me harder to read than just a native Avro map.
| # Avro Maps does not support other keys than a String, | ||
| return { | ||
| "type": "map", | ||
| "values": value_result, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to set key-id and value-id properties.
|
|
||
| def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType: | ||
| # Iceberg only supports micro's | ||
| return {"type": "long", "logicalType": "timestamp-micros"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to set adjust-to-utc to true to signal that the value is a timestamptz.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, this actually uncovered another bug. Thanks!
| return "string" | ||
|
|
||
| def visit_uuid(self, uuid_type: UUIDType) -> AvroType: | ||
| return {"type": "string", "logicalType": "uuid"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UUIDs are stored as 16-byte fixed, not strings. See https://github.com/apache/iceberg/blob/master/format/spec.md#appendix-a-format-specific-requirements
| assert output.getbuffer() == struct.pack("??", True, False) | ||
|
|
||
|
|
||
| def test_write_int() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also have a suite of tests that validates round-trip serialization using encoder and decoder. That would have caught the UUID issue because the decoder is implemented correctly.
| _5byte_input = 2510416930 | ||
| _6byte_input = 734929016866 | ||
| _7byte_input = 135081528772642 | ||
| _8byte_input = 35124861473277986 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are there no tests for negative values or odd numbers?
@Fokko
This PR addresses issue #7255 adding code to:
It also adds tests for the added classes and function as well as a direct validation with the fastavro library.