-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Using RowData to avro reader and writer #1232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Looks like this one needs to be rebased after the others from #1231 are merged. |
bf08cf2 to
50d3b88
Compare
| * @param <P> Partner type. | ||
| * @param <T> Return T. | ||
| */ | ||
| public abstract class AvroWithPartnerByStructureVisitor<P, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR needs to be rebased now that #1235 is in, right?
| } | ||
|
|
||
| static ValueReader<MapData> arrayMap(ValueReader<?> keyReader, | ||
| ValueReader<?> valueReader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indentation is off.
|
|
||
| @Override | ||
| public TimestampData read(Decoder decoder, Object reuse) throws IOException { | ||
| // TODO Do we need to consider time zones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time zones are left to the processing engine. It is up to the engine to convert times to concrete values for storage and from concrete values for display. Iceberg's responsibility is to return the value without modification.
|
|
||
| BigDecimal decimal = d.toBigDecimal(); | ||
|
|
||
| byte fillByte = (byte) (decimal.signum() < 0 ? 0xFF : 0x00); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this logic into a common DecimalUtil method? I think we have quite a few copies of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #1265 for this.
| public TemporaryFolder temp = new TemporaryFolder(); | ||
| @Override | ||
| protected void writeAndValidate(Schema schema) throws IOException { | ||
| List<RowData> inputs = generateDataFromAvroFile(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you will generate the List<Record> firstly, then write to the file appender, and finally read them into List<RowData>, could we just use the RandomData#generateRowData to produce those RowData ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- First,
RandomDatanow is incorrect, like array, like timestamp with zone, and etc.. - Second, using Iceberg avro writer can test format compatible better.
| private static Iterable<RowData> generateRowData(Schema schema, int numRecords, | ||
| Supplier<RandomRowGenerator> supplier) { | ||
| DataStructureConverter<Object, Object> converter = | ||
| DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we may need to call converter.open(RandomData.class.getClassLoader()) to initialize the converter ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can, only StructuredObjectConverter implements open, but now, Flink not support structure type. (It is not RowType).
I'll revert this method in RandomData, it is not be used.
8792a28 to
977a2ce
Compare
flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
Outdated
Show resolved
Hide resolved
| int nanos = ((int) (micros % 1000)) * 1000; | ||
| if (nanos < 0) { | ||
| nanos += 1_000_000; | ||
| mills -= 1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it's simple to use floorDiv and floorMod :
long mills = Math.floorDiv(micros, 1000);
int nanos = Math.floorMod(micros, 1000)*1000;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote a simple benchmark, Math.floor** will be 10% slower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx, that might influence fixing the timestamp types in ORC!
flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java
Outdated
Show resolved
Hide resolved
| import org.apache.flink.table.types.logical.ZonedTimestampType; | ||
|
|
||
| abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> { | ||
| public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public? The only reference to FlinkTypeVisitor that I see in this PR is here, so I'm not sure why this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need, I used to think the reading and writing will rely on FlinkTypeVisitor.
| } | ||
| } | ||
|
|
||
| private static class ArrayWriter<T> implements ValueWriter<ArrayData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, we should refactor this into a base class for array data, so that the encoder parts are shared between Flink and Spark. Not something we should do right now, though.
| @Test | ||
| public void testNormalData() throws IOException { | ||
| testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19982)); | ||
| private List<RowData> generateDataFromAvroFile(Schema schema) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to validate Flink RowData against generic Record. That's what we do in Spark tests, where we first write using generics (or Avro in older tests) and then validate that the records we read using the Spark object model are equivalent. By doing that, you not only test that RowData to disk and back to RowData works, but that the records are actually equivalent to another read format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we should have a asserter for RowData and Record.
|
@JingsongLi, this looks ready to go so I merged it. I think we can still improve some of the tests by validating the read and write paths separately and comparing records against Iceberg generics. But I believe that @chenjunjiedada is working on the validations or assert methods in another PR so we can get that done later. Thanks for working on this, it looks great. |
|
Thanks @rdblue for your patient review, I will continue to pay attention to and participate in the follow-up improvement. |
Fixes #1231