-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: support to RowData partition. #1299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| private static PositionalGetter<?> buildGetter(LogicalType logicalType, Type type) { | ||
| switch (type.typeId()) { | ||
| case STRING: | ||
| return (row, pos) -> row.getString(pos).toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg requires strings to be CharSequence, not necessarily String. So if you have UTF8 data, you can potentially just wrap it to produce a CharSequence rather than building an immutable JVM string.
Not a blocker, just something to keep in mind for the future.
| return (row, pos) -> row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal(); | ||
|
|
||
| case TIME: | ||
| return (row, pos) -> (long) row.getInt(pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be in microseconds, not milliseconds. We should probably include a comment about it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Flink's time type is milliseconds, here we need microseconds. Will add unit tests to address this bug.
| LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType; | ||
| return (row, pos) -> { | ||
| TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); | ||
| return timestampData.getMillisecond() * 1000 + Math.floorDiv(timestampData.getNanoOfMillisecond(), 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nano of millisecond is always positive, right? In that case there is no need for floorDiv.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true.
| } | ||
| } | ||
|
|
||
| private static Object transform(Object value, Type type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the same time bug as the getter. I think it would be better to avoid deriving expected values and just hard-code them. That makes tests easier to read, and more reliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to hard-coding is to validate against a different object model. For example, you could generate the data with Iceberg generics, convert them to RowData, and then validate that the partitions produced from both object models match. That would catch the time bug and would also allow you to avoid needing to generate random RowData.
|
Thanks @openinx! It mostly looks good, except for the time bug. I'd also like to update the tests so that they are more likely to catch similar bugs. |
| case TIME: | ||
| // Iceberg's time is in microseconds, while flink's time is in milliseconds. | ||
| LocalTime localTime = (LocalTime) object; | ||
| return (int) TimeUnit.NANOSECONDS.toMillis(localTime.toNanoOfDay()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we will truncate the localTime to be milliseconds, so we will erase the microseconds part. That's to say, the partition value will be different between Record and RowData because lost microseconds. Should we disable the TIME type as a partition key when using flink sink connector in case of partition value mismatching ?
The following unit tests indicate the above thing : https://github.com/apache/iceberg/pull/1299/files#diff-97304b05e2faea4a749031f514361a70R193
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is necessary to disable because any data written by Flink will necessarily be a millisecond-precision value. Partitioning is still correct with respect to the data that was written, because all of the data has millisecond values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the same data with time type, if flink write them into an iceberg table A, and hive MR or spark read it, in this case, there should be no problem. But for the same data set, both flink and spark write them into difference tables A and B, then there should be difference between table A and B because of lost microseconds. The differences sounds reasonable because of the different behavior from different compute engines.
|
Looks good to me. Thanks @openinx! |
As we have decided to change from
RowtoRowDataas internal row to read and write parquet/avro/orc files. So in theory we will update all thoseFlinkAvroReader,FlinkAvroWriter,FlinkParquetReaders,FlinkParquetWriters,TaskWriterFactoryto useRowData.In this patch, I've introduced a new
RowDataWrapperto partitionRowData. In future (Once one of the flink RowData readers and writers (parquet/avro/orc) is ready), we will remove the deprecatedRowWrapper.