-
Notifications
You must be signed in to change notification settings - Fork 3k
Avro: Add internal writer #11919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avro: Add internal writer #11919
Conversation
| return ValueReaders.doubles(); | ||
| case STRING: | ||
| return ValueReaders.strings(); | ||
| case FIXED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per Types.java, fixed also should be written and read as plain ByteBuffers.
Refer the testcase in TestInternalWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this change.
Another check that this is correct is that ByteBuffer is used as the representation used by FixedLiteral, which must match the type of internal objects.
| try (AvroIterable<StructLike> reader = | ||
| Avro.read(file.toInputFile()) | ||
| .project(schema) | ||
| .createResolvingReader(InternalReader::create) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gives GenericRecord, couldn't find a way to get the pure StructLike.
I cannot use .setCustomType to get StructProjection? It is read only class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's okay that this produces GenericRecord that is a StructLike. There's no need to avoid it.
| import org.apache.avro.Schema; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| abstract class BaseAvroSchemaVisitor extends AvroSchemaVisitor<ValueWriter<?>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true that this is a visitor, but I think it's a best practice to use the class name to convey the purpose of this particular visitor.
In this case, it's building a writer tree so I think a name like BaseWriteBuilder would be more appropriate (based on what was copied here). You were right to use Base to indicate that it is an abstract base class that is shared across object models. I might revise that suggestion when I read through more to see if this is more specific, too.
Another reason is that the name BaseAvroSchemaVisitor is misleading. This class builds a writer tree and has a specific purpose. It wouldn't be used as a base for a visitor that has a different purpose, but the name implies that it could be.
core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| protected ValueWriter<?> fixedWriter(int size) { | ||
| return ValueWriters.byteBuffers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct for the write path. In the read path, we want to broadly accept values and let validation happen later, so reading directly as ByteBuffer and not checking the fixed length is okay. However in the write path we do need to validate that the ByteBuffer being written is the expected fixed size, which ValueWriters.byteBuffers() (ByteBufferWriter) doesn't do. However, FixedWriter and GenericFixedWriter do check:
@Override
public void write(byte[] bytes, Encoder encoder) throws IOException {
Preconditions.checkArgument(
bytes.length == length,
"Cannot write byte array of length %s as fixed[%s]",
bytes.length,
length);
encoder.writeFixed(bytes);
}I think this needs a new ByteBufferWriter for fixed buffers:
public static ValueWriter<ByteBuffer> fixedBuffers(int length) {
return new FixedByteBufferWriter(length);
}
private static class FixedByteBufferWriter implements ValueWriter<ByteBuffer> {
private final int length;
private FixedByteBufferWriter(int length) {
this.length = length;
}
@Override
public void write(ByteBuffer bytes, Encoder encoder) throws IOException {
Preconditions.checkArgument(
bytes.remaining() == length,
"Cannot write byte buffer of length %s as fixed[%s]",
bytes.remaining(),
length);
encoder.writeBytes(bytes);
}
}| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| public class TestInternalWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this one test is enough validation for the internal writer. Other object models are testing using the DataTest set of tests and a writeAndValidate implementation that is basically what you have here.
To add that, you'll need to create a random data generator for internal data (like RandomAvroData and RandomGenericData) and a helper to validate the records after they are read (like DataTestHelpers.assertEquals).
core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java
Outdated
Show resolved
Hide resolved
| 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); | ||
|
|
||
| StructProjection structProjection = StructProjection.create(schema, schema); | ||
| StructProjection row = structProjection.wrap(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a projection. Using GenericRecord exercises the StructLike write path.
| DataFile dataFile = dataWriter.toDataFile(); | ||
|
|
||
| assertThat(dataFile.format()).as("Format should be Avro").isEqualTo(FileFormat.AVRO); | ||
| assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions are testing the writer produced by Avro.writeData(...).build() rather than the object model. I don't think there's any need for them.
| .build()) { | ||
| writtenRecords = Lists.newArrayList(reader); | ||
| } | ||
| assertThat(writtenRecords).hasSize(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The style in the project is to leave an empty newline after control flow blocks for readability.
3b95d75 to
51e760f
Compare
|
@rdblue: Thanks a lot for the review. I have addressed all the comments. PR is ready. |
| public class TestInternalAvro extends AvroDataTest { | ||
| @Override | ||
| protected void writeAndValidate(Schema schema) throws IOException { | ||
| List<StructLike> expected = RandomAvroData.generateStructLike(schema, 100, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use a new random seed.
core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| public static List<Object> generateList( | ||
| Random random, Types.ListType list, Supplier<Object> elementResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the supplier names no longer make much sense in this context, but this is minor.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approved too soon and noticed the random seed problem. Please fix and then we can merge. Thanks, @ajantha-bhat!
| @Override | ||
| protected void writeAndValidate(Schema schema) throws IOException { | ||
| List<StructLike> expected = | ||
| RandomInternalData.generate(schema, 100, System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should not randomly generate the random seed. We want tests to be predictable. We normally choose a number and hard-code it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Since the comment was to use "random" seed. I went with random.
Tests should not randomly generate the random seed
Is it because if the test fails, we can't know what seed was used? Maybe logging the seed can help to solve this problem. As of now I hardcoded it.
9135a03 to
2f4d8df
Compare
|
Rebased as I hit the flaky test (Flink) |
|
Would have been nice to fix the nit from the last review, but it isn't a blocker. Thanks, @ajantha-bhat! I'll merge. |
Follow up for #11108