-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add wrapper to adapt Row to StructLike #1175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
Outdated
Show resolved
Hide resolved
|
@openinx, now that the RC for 0.9.0 is out, I should be able to pick back up on Flink reviews tomorrow. I'll probably start with this one since we need to clean this up. Thanks! |
|
@openinx, I opened an alternative to this PR, #1195. Please take a look. This solution looks fairly clean for producing a The approach I took in the other PR is to reuse the existing accessors, which accept a |
|
Ping @rdblue for reviewing. |
| } | ||
|
|
||
| private static PositionalGetter buildGetter(Type type) { | ||
| if (type instanceof Types.StructType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The objects returned by this wrapper need to be Iceberg's internal representation:
intforDateType: number of days from epochlongforTimeType: number of microseconds from midnightlongfor bothTimestampType: number of microseconds from epochByteBufferfor bothfixed(L)andbinarytypesBigDecimalfordecimal(P, S)
Because we Flink uses the same in-memory representation as Iceberg generics, this should use the same conversions that we use for Record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details, we discussed about this thing in here, maybe you want to take a look :-) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this needs to convert to the representation that internal classes use.
Iceberg's generic data model is intended for passing data to and from Java applications, which is why they use friendlier classes. It is up to data models like Iceberg generics or Flink's data model to convert to that representation. Iceberg core should modify data as little as possible.
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| public class TestPartitionKey { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test based on Spark's TestPartitionValues? That tests every supported type, null values, and different column orders.
| partitionKey2.partition(rowWrapper.wrap(row)); | ||
| Assert.assertEquals(1, partitionKey2.size()); | ||
| Assert.assertEquals(200, (int) partitionKey2.get(0, Integer.class)); | ||
| Assert.assertEquals(partitionKey2.toPath(), "structType.innerIntegerType=200"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you split each of these blocks into a separate test case? There are lots of different cases mixed together in this method. Mixing cases together makes it harder to see what is broken when tests fail because you don't get a picture of what is common across failed cases since many of them don't run.
|
Addressed all the comments, Pls take another look, thanks @rdblue . |
|
I reopened this to run tests against master with the CI fix. It's passing tests so I'll merge it. Thanks, @openinx! |
This patch abstract the common codes of PartitionKey to the newly introduced class
BasePartitionKey, and both sparkPartitionKeyand flinkPartitionKeywill extend this base class. I also provide the unit tests for flinkPartitionKey.