diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 85a2684bff15..decca0384ba1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; import org.apache.iceberg.mapping.NameMapping; @@ -69,6 +70,13 @@ public Schema record(Schema record, List names, List fields) { hasChange = true; } + if (isOptionSchemaWithNonNullFirstOption(field.schema())) { + // if the field has an optional schema where the first option is not NULL, + // we update hasChange flag to make sure we reorder the schema and make the + // NULL option as the first + hasChange = true; + } + Schema fieldSchema = fields.get(field.pos()); // All primitives are selected by selecting the field, but map and list // types can be selected by projecting the keys, values, or elements. @@ -247,8 +255,17 @@ private static Schema copyRecord(Schema record, List newFields) { } private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { + Schema newSchemaReordered; + // if the newSchema is an optional schema, make sure the NULL option is always the first + if (isOptionSchemaWithNonNullFirstOption(newSchema)) { + newSchemaReordered = AvroSchemaUtil.toOption(AvroSchemaUtil.fromOption(newSchema)); + } else { + newSchemaReordered = newSchema; + } + // do not copy over default values as the file is expected to have values for fields already in the file schema Schema.Field copy = new Schema.Field(field.name(), - newSchema, field.doc(), field.defaultVal(), field.order()); + newSchemaReordered, field.doc(), + AvroSchemaUtil.isOptionSchema(newSchemaReordered) ? JsonProperties.NULL_VALUE : null, field.order()); for (Map.Entry prop : field.getObjectProps().entrySet()) { copy.addProp(prop.getKey(), prop.getValue()); @@ -265,4 +282,8 @@ private static Schema.Field copyField(Schema.Field field, Schema newSchema, Inte return copy; } + + private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) { + return AvroSchemaUtil.isOptionSchema(schema) && schema.getTypes().get(0).getType() != Schema.Type.NULL; + } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java new file mode 100644 index 000000000000..13efc18a7727 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.iceberg.Files; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; + +public class TestAvroOptionsWithNonNullDefaults { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void writeAndValidateOptionWithNonNullDefaultsPruning() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, + ImmutableList.of( + new Schema.Field("field", Schema.createUnion(Schema.createArray(Schema.create(INT)), Schema.create(NULL)), + null, ImmutableList.of()) + ) + ); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put("field", ImmutableList.of(1, 2, 3)); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put("field", null); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + List expected = ImmutableList.of(record1, record2); + + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(writeSchema); + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + AvroTestHelpers.assertEquals(readIcebergSchema.asStruct(), expected.get(i), rows.get(i)); + } + } + + @Test + public void writeAndValidateOptionWithNonNullDefaultsEvolution() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, + ImmutableList.of( + new Schema.Field("field", Schema.createUnion(Schema.create(INT), Schema.create(NULL)), null, -1) + ) + ); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put("field", 1); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put("field", null); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + Schema readSchema = Schema.createRecord("root", null, null, false, + ImmutableList.of( + new Schema.Field("field", Schema.createUnion(Schema.create(LONG), Schema.create(NULL)), null, -1L) + ) + ); + + GenericData.Record expectedRecord1 = new GenericData.Record(readSchema); + expectedRecord1.put("field", 1L); + GenericData.Record expectedRecord2 = new GenericData.Record(readSchema); + expectedRecord2.put("field", null); + List expected = ImmutableList.of(expectedRecord1, expectedRecord2); + + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + AvroTestHelpers.assertEquals(readIcebergSchema.asStruct(), expected.get(i), rows.get(i)); + } + } +}