-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Replace Row with RowData in flink write path. #1320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| this.targetFileSizeBytes = targetFileSizeBytes; | ||
| this.format = format; | ||
| this.appenderFactory = new FlinkFileAppenderFactory(schema, tableProperties); | ||
| this.flinkSchema = FlinkSchemaUtil.convert(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark, we had a bug where Spark may produce a row with a short, which is stored as an int in Iceberg. In a CTAS query, data would actually get passed to Iceberg with the short and we would end up with a ClassCastException. That's why we now pass the dataset schema when creating writers. You might want to watch out for a similar bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great remanding, I think the bug you mentioned is this one #999. The flink don't support CTAS but support INSERT INTO iceberg_table SELECT * from table_2, if the table_2 has a TINYINT or SMALLINT, them its BinaryRowData queried from SELECT will be byte or short, we also need the raw flink's schema to read the values from BinaryRowData (rather than the flink schema converted from iceberg schema), and write those byte or short into integer. Let me consider how to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only appears in Spark with a CTAS query because that's the only time that Spark doesn't get a schema back from the table. When Spark has a table schema, it will automatically insert casts to the appropriate types so this problem doesn't happen. I'm not sure if Flink does that, but if it does then you wouldn't need to worry about that bug.
| private static PositionalGetter<?> buildGetter(LogicalType logicalType, Type type) { | ||
| switch (type.typeId()) { | ||
| case STRING: | ||
| switch (logicalType.getTypeRoot()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the type from iceberg type to flink's logical type here, because the value of tinyint & smallint is a byte & short, when cast to the byte or short to Integer here, it will throw a cast failure exception. Using logical type here so that we could cast it to integer right way.
| return new UUID(mostSigBits, leastSigBits); | ||
| }; | ||
| case VARBINARY: | ||
| if (Type.TypeID.UUID.equals(type.typeId())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an identity check would be okay since this is an enum symbol, but either way is fine.
| ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos)); | ||
| long mostSigBits = bb.getLong(); | ||
| long leastSigBits = bb.getLong(); | ||
| return new UUID(mostSigBits, leastSigBits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like another area where we should have a util method to convert (though it shouldn't block this commit).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true, we could have a separate pull request for this.
| TimestampType timestampType = (TimestampType) logicalType; | ||
| return (row, pos) -> { | ||
| LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); | ||
| return DateTimeUtil.microsFromTimestamp(localDateTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the same logic here that is used in the other timestamp type? Both of the values are TimestampData that is returned by getTimestamp. It seems like converting directly to a microsecond value is better than going through LocalDateTime here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it could be the same to convert TimestampData to a long. I separate them because the TimestampType are different, and we are depending the TimestampType.getPrecision() or LocalZonedTimestampType.getPrecision() to get the precision (though we could use the constant 6 here, but better to use the timestamp's precision getter).
| for (RowData row : rows) { | ||
| Integer id = row.isNullAt(0) ? null : row.getInt(0); | ||
| String data = row.isNullAt(1) ? null : row.getString(1).toString(); | ||
| records.add(createRecord(id, data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also use the assertEquals implementation that @chenjunjiedada has added in #1266. That would be better than converting a specific record type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
|
I have a few minor comments, but I don't think any of those should block this going in. I think it is correct. We can make tests better by using |
|
Merged. Thanks for working on this, @openinx! |
This patch addressing the third point in issue #1305
After this patch, only
FlinkParquetReaders,FlinkParquetWriters,RandomData,TestFlinkParquetReaderWriterwill referenceRowdata type.Once we merged the pull requests about RowData parquet readers (#1266) and parquet writers (#1272), there should be no other core classes that will use
Rowdata type.