-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement the parquet value reader & writer for apache flink #1125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Ping @rdblue , Thanks. |
|
Thanks! I'll give this a thorough review tomorrow. |
|
|
||
| @Test | ||
| public void testCorrectness() throws IOException { | ||
| int numRows = 2500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How high have you tested? I usually test locally with ~1m rows.
Could you also add tests with the new generation methods that @samarthjain added for Parquet vectorization? Those allow you to generate data that will be dictionary encoded and that will fall back to non-dictionary after a few dictionary pages.
iceberg/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
Lines 99 to 106 in 705da1b
| public static Iterable<Record> generateFallbackData(Schema schema, int numRecords, long seed, long numDictRecords) { | |
| return newIterable(() -> new FallbackDataGenerator(schema, seed, numDictRecords), schema, numRecords); | |
| } | |
| public static Iterable<GenericData.Record> generateDictionaryEncodableData( | |
| Schema schema, int numRecords, long seed, float nullPercentage) { | |
| return newIterable(() -> new DictionaryEncodedDataGenerator(schema, seed, nullPercentage), schema, numRecords); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let me think about how to add those unit tests. Thanks.
| return rows; | ||
| } | ||
|
|
||
| private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this inherit from the one for Iceberg generics to avoid duplicating the list and map methods? Primitive and struct will need to be implemented, but this could reuse a lot there as well.
| } | ||
| } | ||
|
|
||
| public interface StructWriterFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding this interface, what about adding WriteBuilder.createStructWriter to do this? That way, the Flink builder could just inherit from the generic builder and override that one method.
I think that would be cleaner because adding this interface requires also adding public methods to pass the factory. I'd rather not add those public methods if we can avoid it by adding a protected method and change the builder to a protected class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we will also need to mark the WriteBuilder from private to public because it will be accessed by flink classes from outside package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be protected, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected is not enough for the outside flink package, the WriterBuilder is a static class in org.apache.iceberg.data.parquet , it must be public so that other packages could inherit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. The class should be public, but the method that will be overridden should be protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, I've implemented two versions. the first version provides a public interfaces with protected StructureWriterFactory and StructReaderFactory, the second version use the inherit WriterBuilder & ReaderBuilder #1125. For me , seems the first version looks much more concise. I plan to change to version#1 (with the public interface and protected methods ). Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you post the branch with the inheritance version? I'd like to see it to compare. I don't like the extra public classes and methods, and I think that inheritance would be a cleaner public API. I'm curious why you think the other approach looks more concise, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inheritance version is here.
I'm curious why you think the other approach looks more concise, though.
Because the inheritance version will need to expose the buildReader and buildWriter logic , for example this. If we change those logic, will also need to change in flink reader/writers. seems the same logic with the GenericParquetReader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, how about inheriting from the outer class, then? That way, the interface and the method that accepts the factory could be protected. Mainly, I don't think these should be public.
| StructReader<T, T> create(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct); | ||
| } | ||
|
|
||
| static class RecordReader extends StructReader<GenericRecord, GenericRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this to use the GenericRecord instance rather than the Record interface?
I don't see much value in this change. We will just need to change it back if we want to add implementations of Record that are not generic, like we do with our internal classes that extend Avro's IndexedRecord. Ideally, I'd like to change those over to use our generics readers eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this because the buildReader method return the GenericRecord ParquetValueReader. Seems better to change the ParquetValueReader to be Record reader.
| } | ||
|
|
||
| static class RecordReader extends StructReader<Record, Record> { | ||
| public interface StructReaderFactory<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the write path, I think it would be nice to refactor this to avoid exposing new public methods and interfaces.
|
Thanks @openinx! Overall this looks pretty close to ready to go in. I like that we can reuse so much between Flink and Iceberg generics. |
|
|
||
| public interface StructWriterFactory { | ||
|
|
||
| StructWriter<?> create(List<ParquetValueWriter<?>> writers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to mark this method to be protected but seems java8 don't allow to do that....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interface methods are always public.
|
|
||
| private RandomDataGenerator(long seed) { | ||
| this.random = new Random(seed); | ||
| private static class RandomRecordDataGenerator extends RandomDataGenerator<Record> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about RandomRecordGenerator instead of RandomRecordDataGenerator? I think that's more descriptive since it returns random records.
| import static org.apache.iceberg.types.Types.NestedField.required; | ||
|
|
||
| public class TestFlinkParquetReaderWriter { | ||
| private static final int NUM_RECORDS = 1_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know that this passes, but I don't think we need to run with this high of a record count every time in CI. Could you reduce this to 20_000?
|
@openinx, I think this is ready to commit other than the public interfaces and methods to pass struct factories. I'd like to make those protected or go with the inheritance approach. Personally, I think the inheritance approach is cleaner and I don't mind the method you pointed out, but either way I think can work. |
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public static ParquetValueReader<Row> buildRowReader(Schema expectedSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the buildReader to buildRowReader because the parent buildReader will return with a ParquetValueReader <Record> data type, which clashes with this FlinkParquetReaders 's buildReader returned ParquetValueReader <Row>.
|
Thanks, @openinx! I appreciate you going with the inheritance approach for now. I'll prototype the alternative I suggested above, since I think that addresses your duplication concern while not making the factory interface public. I'm merging this so that it unblocks you and we can find a good solution to that problem in parallel. |
No description provided.