-
Notifications
You must be signed in to change notification settings - Fork 3k
Data: Support reading default values from generic Avro readers #6004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data: Support reading default values from generic Avro readers #6004
Conversation
| return new NestedField(true, id, name, type, doc, null, null); | ||
| } | ||
|
|
||
| public static NestedField optional( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we talked about making these package-private until the implementation was complete so that it doesn't leak into the public API before the values are read correctly? Is there some reason why that wouldn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that I need to instantiate a NestedField with default value in the TestReadDefaultValues class, but this class lives in the iceberg-data module, which is not the same package as types.java in the API module, so package private doesn't work..
I can annotate this method as VisibleForTesting and/or Beta, do you think that works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the test class? We can't merge this unless these are not visible. Annotations don't change visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to move the test class to another package since all the Avro dependencies will break in this case. I have updated the PR to make both optional and the NestedField constructor protected and inherited NestedField class in the test class. However, that has the risk of having to downgrade NestedFiled constructor to private in the future once this workaround is no longer needed.
Another option is to keep both as private, and use reflection in the test to access them as a temporary workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kept the constructor as private and the newly introduced NestedField APIs as public.
| || MetadataColumns.metadataFieldIds().contains(field.fieldId()), | ||
| "Missing required field that doesn't have a default value: %s", | ||
| field.name()); | ||
| // If the field from Iceberg schema has initial default value, we just pass and don't |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: In Iceberg, don't use personal pronouns like "I" or "we" in documentation or comments because it makes documentation harder to understand (who is "we"?) and longer. Instead, be direct:
If the field has an initial default value, do not project it in the read schema. The default will be added in the Iceberg
ValueReader.
| // If the field from Iceberg schema has initial default value, we just pass and don't | ||
| // project it to the avro file read schema with the generated _r field name, | ||
| // the default value will be directly read from the Iceberg layer | ||
| if (field.initialDefault() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the case for a null default add a field while this one does not? That seems strange to me. I think this should either add a fake field in both cases or not.
I think the question hinges on why the fake field is added for the other case. I think it has something to do with the record schema needing to match the Iceberg schema exactly -- the record schema should not be different. If that's the case then it should be added for the non-null initial default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I originally thought using continue can just be an implementation trick, since although the output avro schema of the buildavroprojection process is lacking the field, it will be taken care of when they are being compared again in the ValueReader.
I think I can make the code logic more consistent by still creating a fake field with some other suffix like "_d" instead of "_r". The "_d" field will not be projected in the real file either, and the default value read is still taken care of by ValueReader. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of changing the name of the placeholder field that is added? Just use the same code that's already here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on the implementation of ValueReaders. In the case of defaults, ValueReaders takes care of filling in the default value. In the case of optionals, it is delegated to Avro to fill in the null values. I like the idea of doing it in the ValueReaders so it is consistent across all file formats, even when the file format does not support the semantic (e.g., ORC schema does not support default values).
| } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { | ||
| positionList.add(pos); | ||
| constantList.add(false); | ||
| } else if (record.getField(field.name()) == null && field.initialDefault() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason for leaving out the field in BuildAvroProjection was to make this check work. But I'm pretty sure that the Avro Schema needs to match the Iceberg Schema. So this will probably need to have some other way of detecting that the field will not be read from the field because it was missing from the file schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I suggested the above way of creating a "_d" field which can be a marker, or I can use a custom avro property, something like "<HAS_DEFAULT -> true>" to signify it, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing information through field names is error prone because you don't control the names of real columns.
Instead, what about adding a metadata key to the Avro schema field that signals this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest patch, record is no longer required in this method. The default value is set here similar using a mechanism similar to positions and constants and then it is used in the read() method. Removing dependency on record simplified this API and a few others as Schema record is no longer required in the signature (similar to before this patch).
| "Missing required field in nameMapping", | ||
| IllegalArgumentException.class, | ||
| "Missing required field: x", | ||
| "Missing required field that doesn't have a default value: x", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"with no default value"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "Missing required field" is adequate. Based on where this error is placed, Missing field with no default value could be an ambiguous message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just saying that it should be "with no default value" rather than saying "doesn't have a default value". We try to be as direct as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am saying Missing required field with no default could give the impression that the fix is by adding a default value (also depending on how one could read it, it could mean the fix is by introducing both the field and the default value); however, in fact the fix is either by adding the field or adding a default value. Hence, if we leave it at Missing required field, it could be adequate. We could also say Missing required field, or missing default value.
| expected = ByteBuffers.toByteArray((ByteBuffer) expected); | ||
| } | ||
| if (actual instanceof ByteBuffer) { | ||
| actual = ByteBuffers.toByteArray((ByteBuffer) actual); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks suspicious. Are you sure that there is a mismatch in the readers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in Type.java, fixed type is associated with ByteBuffe:
FIXED(ByteBuffer.class),
So theoretically, fixed type data should always be BtyeBuffer type instead of byte array.
And here the problem is actually the RandomGenericData generated the byte[] data for fixed type (I tried to change that one but it resulted in more tests failures because there are some existing avro writers depending on it, and Avro can only write fixed type data represented by byte[]).
So I instead added the conversion in this assertion helper class to convert the ByteBuffer back to byte[] for compatible comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the above conversion, the FixedType case in my TestReadDefaultValues will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Iceberg generic in-memory representation, byte[] is correct. The reader should be producing byte[] and not ByteBuffer. The Iceberg type's parameter is for Iceberg internals, not the generic representation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest patch, this check is removed, and SingleValueParser moved to byte[]. However, it seems that Literals still uses ByteBuffer for FIXED type, so the discrepancy showed up again in SingleValueParser, where it needs to handle both byte[] and ByteBuffer inputs. There is a more detailed comment on this in SingleValueParser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also to clarify where the dependency on byte[] is coming from, it actually comes from DataTestHelpers. We can see here that the FIXED case fails the assertion if the data type is not byte[]. Hence, there is a conflict between DataTestHelpers which uses byte[] and Literals which uses ByteBuffer. Such a gap is bridged in SingleValueParser#toJson in the current PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wmoustafa what is the failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep SingleValueParser and DataTestHelpers without any change, the error is:
org.apache.iceberg.data.avro.TestReadDefaultValues > writeAndValidate FAILED
java.lang.AssertionError: [Expected should be a byte[]]
Expecting actual:
java.nio.HeapByteBuffer[pos=0 lim=2 cap=2]
to be an instance of:
byte[]
but was instance of:
java.nio.HeapByteBuffer
at org.apache.iceberg.data.DataTestHelpers.assertEquals(DataTestHelpers.java:93)
at org.apache.iceberg.data.DataTestHelpers.assertEquals(DataTestHelpers.java:39)
at org.apache.iceberg.data.avro.TestReadDefaultValues.writeAndValidate(TestReadDefaultValues.java:164)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think this does indicate a bug.
You're reading into Iceberg generics, so the values should all match the classes created by Iceberg generics. Those are not the same as the internal classes used by Iceberg because they are intended to be friendlier. For example, Iceberg users days from the unix epoch to represent a date, but generics will return Java's LocalDate.
Here's a diff that fixes your tests by using the correct types for the expected values:
diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
index 4cb4126315..c19d75f000 100644
--- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
+++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java
@@ -18,9 +18,11 @@
*/
package org.apache.iceberg.data;
+import java.nio.ByteBuffer;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
public class IdentityPartitionConverters {
@@ -46,7 +48,9 @@ public class IdentityPartitionConverters {
return DateTimeUtil.timestampFromMicros((Long) value);
}
case FIXED:
- if (value instanceof GenericData.Fixed) {
+ if (value instanceof ByteBuffer) {
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ } else if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
}
return value;
diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
index 0f98d2e571..0fb50566bf 100644
--- a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
+++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppender;
@@ -133,7 +134,7 @@ public class TestReadDefaultValues {
for (Record record : generatedRecords) {
Record expectedRecord = GenericRecord.create(readerSchema);
expectedRecord.set(0, record.get(0));
- expectedRecord.set(1, defaultValue);
+ expectedRecord.set(1, IdentityPartitionConverters.convertConstant(type, defaultValue));
expected.add(expectedRecord);
}The tests still fail, but now they show the real problem: default values are not being converted to match the object model. For partition constants, that is done in GenericReader:
Map<Integer, ?> partition =
PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);We will need to do something similar for default values. I think what makes the most sense is to add a conversion function to StructReader that needs to be implemented by sub-classes. That can call the existing conversion functions, like IdentityPartitionConverters.convertConstant or similar for Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is some deep analysis with 3 levels of conversion to get it right :) I have directly called IdentityPartitionConverters.convertConstant from StructReader constructor. Ideally, IdentityPartitionConverters should be renamed in a future release since it is no longer specific to partitions.
| if (isDeletedColumnPos == null) { | ||
| this.positions = new int[0]; | ||
| this.constants = new Object[0]; | ||
| this.constantValuesPositions = new int[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"ValuesPositions" is a bit awkward with multiple plural parts. I think it is also longer than needed because the variables would make sense as constantPositions and constantValues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used constantPositions, constantValues, defaultPositions, defaultValues for consistency.
| justification: "Deprecations for 1.0 release" | ||
| - code: "java.method.removed" | ||
| old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles()" | ||
| justification: "Deprecations for 1.0 release" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert the changes that moved these around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were moved around due to the API change in NestedFields. It should go away once we fix the access modifiers for optional and the constructor. Access modifiers were changed to accommodate keeping the new APIs private, but we found there was no good solution for that, so we will just go with making them new API public (as it is the case in the end state). This will take care of keeping this file intact as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this only contains
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "Serialization across versions is not supported"
I think this should be expected as we added a new API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds fine.
| List<ValueReader<?>> readers, | ||
| StructType struct, | ||
| Schema record, | ||
| Map<Integer, ?> idToConstant) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this change isn't needed, nor is the update to add Schema to createStructReader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java
Outdated
Show resolved
Hide resolved
| for (Record record : generatedRecords) { | ||
| Record expectedRecord = GenericRecord.create(readerSchema); | ||
| expectedRecord.set(0, record.get(0)); | ||
| expectedRecord.set(1, defaultValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem with fixed might be introduced here. Iceberg generics will not produce the same value as the internal representation. If you expect ByteBuffer here, the generics are going to produce ByteBuffer.
You could stop setting the reader func to DataReader::create below, but then the GenericRecord instances would not be produced. What I'd do here is have a convert method to convert in this test all of the types that don't match between the two, like timestamps, dates, and fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed per above discussion.
| }; | ||
|
|
||
| @Test | ||
| public void writeAndValidate() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also want to write a file with explicit null values to validate that if the column is present we won't insert the initial default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one more test case to test this.
1b036d9 to
70ae07b
Compare
|
@rdblue Latest commit reverts the change in |
| List<Object> constantList = Lists.newArrayListWithCapacity(fields.size()); | ||
| List<Integer> constantPositionsList = Lists.newArrayListWithCapacity(fields.size()); | ||
| List<Object> constantValuesList = Lists.newArrayListWithCapacity(fields.size()); | ||
| List<Integer> defaultValuesPositionList = Lists.newArrayListWithCapacity(fields.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be defaultPositionList
| set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); | ||
| existingFieldPositionsSet.add(field.pos()); | ||
| } | ||
| // Set default values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: missing empty newline between control flow block and the following statement.
| set(struct, defaultPositions[i], defaultValues[i]); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace change.
| // reader. | ||
| defaultValuesPositionList.add(pos); | ||
| defaultValuesList.add( | ||
| IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes the problem for Iceberg generics, but this conversion is not valid for other in-memory data models, like Spark or Flink. Those also extend this class.
We either need to pass in the conversion method or pass in a map of default constants like the idToConstant map above. I think I'd prefer to pass in an idToDefault map that is used in the same way and produced from the schema.
Using a map will also help solve the problem of structs with nested defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, in order to make sure that we are not producing incorrect results for the other object models, we will need to fail if a default value is needed by this class, but is unavailable.
| import org.junit.Test; | ||
| import org.junit.rules.TemporaryFolder; | ||
|
|
||
| public class TestReadDefaultValues { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Spark and Flink support are added, we will need to have a similar test there.
| } | ||
|
|
||
| @Test | ||
| public void testDefaultValueNotApplied() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
|
@wmoustafa, looks like there are test failures. Can you take a look? |
.palantir/revapi.yml
Outdated
| old: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\ | ||
| \ org.apache.iceberg.types.Types.StructType, java.util.Map<java.lang.Integer,\ | ||
| \ ?>)" | ||
| new: "method void org.apache.iceberg.avro.ValueReaders.StructReader<S>::<init>(java.util.List<org.apache.iceberg.avro.ValueReader<?>>,\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to break compatibility. Can you add a constructor that defaults the idToDefault map to ImmutableMap.of()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Done.
Fixed. |
|
@wmoustafa, I just opened a refactor that should make this a lot easier to get in: #9366 When I went to thoroughly review this, I had to do a lot of research into how |
| private final int[] constantPositions; | ||
| private final Object[] constantValues; | ||
| private final int[] defaultPositions; | ||
| private final Object[] defaultValues; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we go with the approach in #9366, then we should no longer need changes here, which was what originally made this difficult.
|
Now that #9366 is in, I think we should perform a similar refactor for the Iceberg generics and add default support that way. It should be much simpler! |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This pr implements the support for generic Avro readers in
iceberg-datamodule to read default values, which gets demonstrated viadata/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java.The PR is based on the API changes in #4732
Subsequent PRs will mimic this approach to implement the same feature for (Spark/Flink) x (Avro/ORC/Parquet) readers.
@rdblue can you please take a look?