-
Notifications
You must be signed in to change notification settings - Fork 3k
Update Flink Parquet reader and writer to use schema visitor #1237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Flink Parquet reader and writer to use schema visitor #1237
Conversation
40448b7 to
ebcfba9
Compare
| } | ||
| } | ||
|
|
||
| private static class FallbackReadBuilder extends ReadBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be done later, but I think we could rewrite the fallback read builder to use a visitor that is passed in. That way we could use the same one across sources:
static class FallbackReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
FallbackReadBuilder(TypeWithSchemaVisitor<ParquetValueReader<?>> builder) {
this.builder = builder;
}
@Override
public ParquetValueReader<?> message(Types.StructType expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
// the top level matches by ID, but the remaining IDs are missing
builder.struct(expected, message, fieldReaders);
}
@Override
public ParquetValueReader<?> struct(Types.StructType ignored, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
// the expected struct is ignored because nested fields are never found
...
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
| } | ||
| case INT_64: | ||
| case TIMESTAMP_MICROS: | ||
| return new TimestampMicroReader(desc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp reader should not be used for longs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| case INT_8: | ||
| case INT_16: | ||
| case INT_32: | ||
| case TIME_MICROS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the table on #1215, a time should be in milliseconds. Because Parquet stores the time in micros for TIME_MICROS, this will need a converter.
@JingsongLi, how should we handle lossy conversions to Flink types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, unfortunately Flink can only store time in milliseconds. (Although the time type can be defined to nanoseconds, the internal implementation is only milliseconds).
Compared with behavior that throwing exception when encountering a non-zero micro value (If there is a piece of dirty data, it will make the program very dangerous), I prefer just ignore micro part. Though it's not friendly, the user is most convenient to use.
We need to document this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. FYI, I discard the microseconds in milliseconds in #1266.
|
|
||
| @Override | ||
| public TimestampData read(TimestampData ignored) { | ||
| return TimestampData.fromEpochMillis(readLong() / 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not discard microseconds. It should instead call TimestampData.fromEpochMillis(millis, nanosOfMilli).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| case INT_8: | ||
| case INT_16: | ||
| case INT_32: | ||
| case TIME_MICROS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the read path, time needs to be handled separately to convert from millis to micros.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update in the writer side PR.
| case INT_32: | ||
| case TIME_MICROS: | ||
| return ints(sType, desc); | ||
| case INT_64: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp writer should not be used for longs.
|
|
||
| public static ParquetValueReader<Row> buildReader(Schema expectedSchema, MessageType fileSchema) { | ||
| return INSTANCE.createReader(expectedSchema, fileSchema); | ||
| private static class BytesReader extends ParquetValueReaders.PrimitiveReader<byte[]> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is identical to the reader in SparkParquetReaders. Can you move the reader to ParquetValueReaders and use the same implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| BigDecimal bigDecimal = decimal.toBigDecimal(); | ||
|
|
||
| byte fillByte = (byte) (bigDecimal.signum() < 0 ? 0xFF : 0x00); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be rewritten to use an abstract implementation that defines write(in repetitionLevel, BigDecimal decimal). This is the same as the one in SparkParquetWriters, so we should avoid duplicating this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The #1265 PR seams doing the refactor job. I will rebase the writer PR when it has done.
| } | ||
| } | ||
|
|
||
| private static class TimeStampDataWriter extends ParquetValueWriters.PrimitiveWriter<TimestampData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: TimeStamp should be Timestamp
|
|
||
| @Override | ||
| public void write(int repetitionLevel, TimestampData value) { | ||
| column.writeLong(repetitionLevel, value.getMillisecond() * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cannot ignore the nanoseconds of the timestamp value.
| } | ||
| } | ||
|
|
||
| private void assertRowData(Type type, RowData expected, RowData actual) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move these to a TestHelpers class with methods like assertEqualsInternal for the internal representation?
Also, the tests are missing conversions that discard microseconds because this is generating internal representations (like RowData) and comparing to RowData. These tests should write generics and validate a generic record against a row. See the TestHelpers in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| private static class RandomRowGenerator extends RandomGenericData.RandomDataGenerator<Row> { | ||
| private static class RandomRowDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use the generator for generics and validate against generic data. That will catch more cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Now it writes generics in parquet files and readout through Flink parquet reader, and compare the Record with RowData.
|
Thanks for working on this, @chenjunjiedada! You might consider separating this into read and write PRs to make it easier to review. |
|
@rdblue , Thanks a lot for your comments. Let me go through the comments and separate the patch. |
This changes the current
FlinkParquetReader/Writerto use a schema visitor.As discussed in #1215, the current
FlinkParquetReader/Writerare not built with schema visitor so that it cannot recognize the field in Flink data modelRoworRowData. This PR change addsReaderBuilderandWriterBuilderthat extends the schema visitor so that it could build value readers and writers though schema visiting.This assumes the Flink internal data model is
RowDatarather thanRow, it also adds aRandomRowDataGenerator, which generates randomRowData, andassertRowDataassertion for the unit test.