-
Notifications
You must be signed in to change notification settings - Fork 3k
Add IcebergSerDe #1103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IcebergSerDe #1103
Conversation
|
This is required for #933 so that we can write proper integration tests for the IF with it using this SerDe. |
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/SystemTableUtil.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/SystemTableUtil.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/SystemTableUtil.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| protected static String getVirtualColumnName(Properties properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When are properties used and when is configuration used? I'm surprised that we need both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we agree, we discovered this when adding the SerDe - the IF uses Configuration but the SerDe only uses Properties and we wanted to use the methods across both classes and it seemed simpler to overload a method rather than create new Properties from the Configuration in the IF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although is exactly what we're doing in the TableResolver class... :')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration contains all the configs we set in HiveConf and possible hadoop conf as well.
The properties are a merged result of Hive table and partition properties. We can see how Hive uses these as part of the initialize method of AbstractSerde
public void initialize(Configuration configuration, Properties tableProperties,
Properties partitionProperties) throws SerDeException {
initialize(configuration,
SerDeUtils.createOverlayedProperties(tableProperties, partitionProperties));
}```
| return types; | ||
| } | ||
|
|
||
| private static TypeInfo generateTypeInfo(Type type) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this would be easier to implement using the type visitors, which already have the logic to traverse a schema. A good example is converting a Type to Spark's DataType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks way simpler, I'll get started on that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already have a type visitor somewhere from Schema to ObjectInspector. I can also submit that one on my PR so you can focus on the remaining things to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea +1 for a visitor. We have TypeInfo to Iceberg Type visitor for inspiration https://github.com/linkedin/iceberg/blob/master/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guilload that would be great, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still work in progress, things missing are mostly unit tests, but this is what it'll look like:
guilload@3bffa7c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks promising, happy to move that in here when you're done if the others agree. Thanks!
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
Outdated
Show resolved
Hide resolved
mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
Outdated
Show resolved
Hide resolved
|
|
||
| IcebergSerDe serDe = new IcebergSerDe(); | ||
| List<Object> deserialized = (List<Object>) serDe.deserialize(writable); | ||
| Map result = (Map) deserialized.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use the object inspectors?
|
Hello @guilload, if we give you write access as an external collaborator to our fork of Iceberg at https://github.com/ExpediaGroup/iceberg would that make it easier to get your changes into this PR? |
|
Yes, please! Github won't let me open a PR from my repo to yours or fork your repo either. |
| try { | ||
| table = TableResolver.resolveTableFromConfiguration(configuration, serDeProperties); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Unable to resolve table from configuration: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize Java added an UncheckedIOException in 8. We have one that is RuntimeIOException. We should probably convert Iceberg over to using the standard Java one.
|
|
||
| @Override | ||
| public byte[] getPrimitiveJavaObject(Object o) { | ||
| return o == null ? null : ((ByteBuffer) o).array(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct because it doesn't follow the contract of ByteBuffer. Avro will reuse byte buffers, so there is no guarantee that this array is the correct length. In addition, we want to generally follow the ByteBuffer contract so that we don't need to worry about whether an optimization later (buffer reuse) will break certain sections of code.
An easy fix is to use ByteBuffers.toByteArray here.
| case TIME: | ||
| case UUID: | ||
| default: | ||
| throw new IllegalArgumentException(primitiveType.typeId() + " type is not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't fixed by read as binary? And UUID as string? And doesn't Hive support time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First two done. As for TIME - Hive supports DATE and TIMESTAMP, I don't know enough about Iceberg's types to comment on how these differ from TIME but I'm guessing it doesn't?
| } | ||
|
|
||
| @Override | ||
| public int getFieldID() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@omalley, is the Iceberg field ID suitable to return as a Hive field ID here?
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return 31 * field.hashCode() + oi.hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically prefer Objects.hash to this older pattern.
| implements TimestampObjectInspector { | ||
|
|
||
| private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE = | ||
| new IcebergTimestampObjectInspector(o -> ((OffsetDateTime) o).toLocalDateTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: It seems like this would be a bit cleaner if the outer class was abstract and these were anonymous classes with an implementation for LocalDateTime convert(Object o) or something similar. Using Function is okay, but seems like it uses functions to avoid normal inheritance.
| public class TestIcebergBinaryObjectInspector { | ||
|
|
||
| @Test | ||
| public void testIcebergBinaryObjectInspector() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have more cases in this test suite:
- When the buffer's limit is less than array().length
- When the buffer's arrayOffset is non-zero
- When the buffer's position is non-zero
| @Override | ||
| public DateWritable getPrimitiveWritableObject(Object o) { | ||
| Date date = getPrimitiveJavaObject(o); | ||
| return date == null ? null : new DateWritable(date); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of converting to Date and then wrapping with DateWritable, could we use the DateWritable constructor that accepts an integer? That would be more direct and we could convert using DateTimeUtil.daysFromDate(localDate) that we use elsewhere.
|
I had a few questions, but overall this looks good. The only blocker is how |
* Refactor TestIcebergObjectInspector * Inherit from AbstractPrimitiveJavaObjectInspector rather than IcebergPrimitiveObjectInspector * Avoid creating an intermediate Date object * Fix IcebergRecordStructField.equals * Use inheritance to implement static Timestamp object inspectors * Handle UUID type as String * Handle fixed type as binary (byte array)
|
@rdblue thanks for the comments in the review, we've implemented or replied to all of them. Could you please take another look? |
|
|
||
| public class TestIcebergObjectInspector { | ||
|
|
||
| private int id = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this introduced? It seems like relying on the same execution order between the schema creation and the test methods is brittle.
I'd prefer to move back to fixed IDs since that's easier to test and more clear in assertions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That made my life easier when adding new field but I get your point, I'll fix it in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue , @guilload, @massdosage if there are many issues, open items, does it make sense to create a milestone with all the open tickets? So that it can be worked on in parallel by us and we don't duplicate effort or step on each other's toes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. You should be able to create and edit milestones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| TableResolver.resolveTableFromConfiguration(conf); | ||
| } | ||
|
|
||
| @Test(expected = NullPointerException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually prefer using AssertHelpers.assertThrows here, but this is minor since you don't need to check that other state has not been modified after the failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the above is almost certainly going to be refactored when we merge more logic between the two InputFormats we can tackle this then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I didn't want to block progress on Hive just for this.
|
I had a couple of minor comments, but we can fix those later. I'm going to merge this to unblock the next steps. Thanks @cmathiesen, @massdosage, @guilload, and everyone that reviewed! |
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.mr.mapred; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put the hive classes in org.apache.iceberg.hive ? This is committed, but are you guys ok for this refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the convention was that the package name needed to match the subproject name and this isn't in the hive subproject but maybe that's not the case? Alternatively they could go in org.apache.iceberg.mr.hive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer org.apache.iceberg.mr.hive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mr.hive sounds good to me.
I don't think we need to worry too much about this kind of refactor right now. We expect it to change rapidly as we build. We'll include a note in any release about how it is experimental and subject to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, my preference would be to leave this as it is for now and then do a review of all the packaging once we have the StorageHandler and InputFormat merged.
Hello! This is part 1 of our series of PR's to add in the mapred InputFormat to support reading tables from Hive. This was initially meant to only include the
IcebergSerDebut we had to add a few more classes to get it working properly.@rdblue @massdosage @teabot