diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index 2ef9bfd8b9..51838eb240 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -52,11 +52,11 @@ public Type struct(Types.StructType struct, List fieldResults) { } else if (projectedType != null) { sameTypes = false; // signal that some types were altered if (field.isOptional()) { - selectedFields.add( - Types.NestedField.optional(field.fieldId(), field.name(), projectedType, field.doc())); + selectedFields.add(Types.NestedField.optional( + field.fieldId(), field.name(), projectedType, field.getDefaultValue(), field.doc())); } else { - selectedFields.add( - Types.NestedField.required(field.fieldId(), field.name(), projectedType, field.doc())); + selectedFields.add(Types.NestedField.required( + field.fieldId(), field.name(), projectedType, field.getDefaultValue(), field.doc())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index dc4fa61d0c..1147558ed2 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -435,7 +435,6 @@ public static NestedField required(int id, String name, Type type, String doc) { } public static NestedField required(int id, String name, Type type, Object defaultValue, String doc) { - validateDefaultValueForRequiredField(defaultValue, name); return new NestedField(false, id, name, type, defaultValue, doc); } @@ -451,28 +450,24 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, return new NestedField(isOptional, id, name, type, defaultValue, doc); } - private static void validateDefaultValueForRequiredField(Object defaultValue, String fieldName) { - Preconditions.checkArgument(defaultValue != null, - "Cannot create NestedField with a null default for the required field: " + fieldName); - } - private static void validateDefaultValue(Object defaultValue, Type type) { if (defaultValue == null) { return; } switch (type.typeId()) { case STRUCT: - Preconditions.checkArgument(List.class.isInstance(defaultValue), - "defaultValue should be a List of Objects for StructType"); + Preconditions.checkArgument(Map.class.isInstance(defaultValue), + "defaultValue should be a Map from fields names to values, for StructType"); if (defaultValue == null) { return; } - List defaultList = (List) defaultValue; - Preconditions.checkArgument(defaultList.size() == type.asStructType().fields().size()); - for (int i = 0; i < defaultList.size(); i++) { - NestedField.validateDefaultValue(defaultList.get(i), type.asStructType().fields().get(i).type); + Map defaultStruct = (Map) defaultValue; + Preconditions.checkArgument(defaultStruct.size() == type.asStructType().fields().size()); + for (String fieldName : defaultStruct.keySet()) { + NestedField.validateDefaultValue(defaultStruct.get(fieldName), type.asStructType().field(fieldName).type); } break; + case LIST: Preconditions.checkArgument(defaultValue instanceof ArrayList, "defaultValue should be an ArrayList of Objects, for ListType"); @@ -482,6 +477,7 @@ private static void validateDefaultValue(Object defaultValue, Type type) { } defaultArrayList.forEach(dv -> NestedField.validateDefaultValue(dv, type.asListType().elementField.type)); break; + case MAP: Preconditions.checkArgument(Map.class.isInstance(defaultValue), "defaultValue should be an instance of Map for MapType"); @@ -494,10 +490,18 @@ private static void validateDefaultValue(Object defaultValue, Type type) { NestedField.validateDefaultValue(e.getValue(), type.asMapType().valueField.type); } break; + + case FIXED: + case BINARY: + Preconditions.checkArgument(byte[].class.isInstance(defaultValue), + "defaultValue should be an instance of byte[] for TypeId.%s, but defaultValue.class = %s", + type.typeId().name(), defaultValue.getClass().getCanonicalName()); + break; + default: Preconditions.checkArgument(type.typeId().javaClass().isInstance(defaultValue), - "defaultValue should be of same java class of the type, defaultValue class: " + defaultValue.getClass() + - ", type class: " + type.typeId().javaClass()); + "defaultValue should be and instance of %s for TypeId.%s, but defaultValue.class = %s", + type.typeId().javaClass(), type.typeId().name(), defaultValue.getClass().getCanonicalName()); } } diff --git a/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java b/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java index 560426fce3..bc68fe1da6 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java +++ b/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -44,9 +46,9 @@ public static void beforeClass() { @Test public void testStructTypeDefault() { - List structDefaultvalue = new ArrayList<>(); - structDefaultvalue.add(Integer.valueOf(1)); - structDefaultvalue.add("two"); + Map structDefaultvalue = new HashMap<>(); + structDefaultvalue.put(intFieldType.name(), Integer.valueOf(1)); + structDefaultvalue.put(stringFieldType.name(), "two"); NestedField structField = NestedField.optional(2, "optionalStructField", structType, structDefaultvalue, "doc"); Assert.assertTrue(structField.hasDefaultValue()); Assert.assertEquals(structDefaultvalue, structField.getDefaultValue()); diff --git a/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java b/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java index da2904c63e..abe53f930e 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java +++ b/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java @@ -50,6 +50,7 @@ public void testConstructorsValidCases() { // required constructors Assert.assertFalse(required(id, fieldName, fieldType).hasDefaultValue()); Assert.assertFalse(required(id, fieldName, fieldType, doc).hasDefaultValue()); + Assert.assertFalse(required(id, fieldName, fieldType, null, doc).hasDefaultValue()); nestedFieldWithDefault = required(id, fieldName, fieldType, defaultValue, doc); Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); @@ -65,22 +66,17 @@ public void testConstructorsValidCases() { Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); } - @Test (expected = IllegalArgumentException.class) - public void testRequiredNullDefault() { - // illegal case (required with null defaultValue) - required(id, fieldName, fieldType, null, doc); - } - @Test (expected = IllegalArgumentException.class) - public void testRequiredWithDefaultNullDefault() { - // illegal case (required with null defaultValue) - required(id, fieldName, fieldType, null, null); + public void testOptionalWithInvalidDefaultValueClass() { + // class of default value does not match class of type + Long wrongClassDefaultValue = 100L; + optional(id, fieldName, fieldType, wrongClassDefaultValue, doc); } @Test (expected = IllegalArgumentException.class) - public void testOptionalWithInvalidDefaultValueClass() { + public void testReqiredWithInvalidDefaultValueClass() { // class of default value does not match class of type Long wrongClassDefaultValue = 100L; - optional(id, fieldName, fieldType, wrongClassDefaultValue, doc); + required(id, fieldName, fieldType, wrongClassDefaultValue, doc); } } diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index c2baaabe60..d4eed351a3 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -19,14 +19,24 @@ package org.apache.iceberg; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.io.StringWriter; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -34,6 +44,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; + public class SchemaParser { private SchemaParser() {} @@ -55,6 +66,26 @@ private SchemaParser() {} private static final String REQUIRED = "required"; private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; + private static final String DEFAULT = "default"; + + private static final List primitiveClasses = Arrays.asList(Boolean.class, Integer.class, Long.class, + Float.class, Double.class, CharSequence.class, String.class, java.util.UUID.class, BigDecimal.class); + + private static void writeDefaultValue(Object defaultValue, Type type, JsonGenerator generator) throws IOException { + if (defaultValue == null) { + return; + } + generator.writeFieldName(DEFAULT); + if (type.isListType()) { + generator.writeString(defaultValueToJsonString((List) defaultValue)); + } else if (type.isStructType() || type.isMapType()) { + generator.writeString(defaultValueToJsonString((Map) defaultValue)); + } else if (isFixedOrBinary(type)) { + generator.writeString(defaultValueToJsonString((byte[]) defaultValue)); + } else { + generator.writeString(defaultValueToJsonString(defaultValue)); + } + } static void toJson(Types.StructType struct, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -68,13 +99,14 @@ static void toJson(Types.StructType struct, JsonGenerator generator) throws IOEx generator.writeBooleanField(REQUIRED, field.isRequired()); generator.writeFieldName(TYPE); toJson(field.type(), generator); + writeDefaultValue(field.getDefaultValue(), field.type(), generator); if (field.doc() != null) { generator.writeStringField(DOC, field.doc()); } + generator.writeEndObject(); } generator.writeEndArray(); - generator.writeEndObject(); } @@ -87,7 +119,6 @@ static void toJson(Types.ListType list, JsonGenerator generator) throws IOExcept generator.writeFieldName(ELEMENT); toJson(list.elementType(), generator); generator.writeBooleanField(ELEMENT_REQUIRED, !list.isElementOptional()); - generator.writeEndObject(); } @@ -175,6 +206,32 @@ private static Type typeFromJson(JsonNode json) { throw new IllegalArgumentException("Cannot parse type from json: " + json); } + private static boolean isFixedOrBinary(Type type) { + return type.typeId() == Type.TypeID.FIXED || type.typeId() == Type.TypeID.BINARY; + } + + private static Object defaultValueFromJson(JsonNode field, Type type) { + if (!field.has(DEFAULT)) { + return null; + } + + String defaultValueString = field.get(DEFAULT).asText(); + + if (isFixedOrBinary(type)) { + return defaultValueFromJsonBytesField(defaultValueString); + } + + if (type.isPrimitiveType()) { + return primitiveDefaultValueFromJsonString(defaultValueString, type); + } + + try { + return defaultValueFromJsonString(defaultValueString, type); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static Types.StructType structFromJson(JsonNode json) { JsonNode fieldArray = json.get(FIELDS); Preconditions.checkArgument(fieldArray.isArray(), @@ -190,13 +247,13 @@ private static Types.StructType structFromJson(JsonNode json) { int id = JsonUtil.getInt(ID, field); String name = JsonUtil.getString(NAME, field); Type type = typeFromJson(field.get(TYPE)); - + Object defaultValue = defaultValueFromJson(field, type); String doc = JsonUtil.getStringOrNull(DOC, field); boolean isRequired = JsonUtil.getBool(REQUIRED, field); if (isRequired) { - fields.add(Types.NestedField.required(id, name, type, doc)); + fields.add(Types.NestedField.required(id, name, type, defaultValue, doc)); } else { - fields.add(Types.NestedField.optional(id, name, type, doc)); + fields.add(Types.NestedField.optional(id, name, type, defaultValue, doc)); } } @@ -252,4 +309,134 @@ public static Schema fromJson(String json) { } }); } + + private static String defaultValueToJsonString(Map map) { + Map jsonStringElementsMap = new LinkedHashMap<>(); + map.entrySet().forEach( + entry -> jsonStringElementsMap.put(entry.getKey(), defaultValueToJsonString(entry.getValue()))); + return defaultValueToJsonString(jsonStringElementsMap); + } + + private static String defaultValueToJsonString(List list) { + List jsonStringItemsList = new ArrayList<>(); + list.forEach(item -> jsonStringItemsList.add(defaultValueToJsonString(item))); + return defaultValueToJsonString(jsonStringItemsList); + } + + private static String defaultValueToJsonString(byte[] bytes) { + try { + return JsonUtil.mapper().writeValueAsString(ByteBuffer.wrap(bytes)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static String defaultValueToJsonString(Object value) { + if (isPrimitiveClass(value)) { + return value.toString(); + } + + try { + return JsonUtil.mapper().writeValueAsString(new SerDeValue(value)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static boolean isPrimitiveClass(Object value) { + return primitiveClasses.contains(value.getClass()); + } + + private static Object defaultValueFromJsonBytesField(String value) { + try { + return JsonUtil.mapper().readValue(value, ByteBuffer.class).array(); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static Object defaultValueFromJsonString(String jsonString, Type type) throws IOException { + Preconditions.checkArgument(!type.isPrimitiveType(), "jsonString %s is for primitive type %s", jsonString, type); + Object jsonStringCollection = JsonUtil.mapper().readValue(jsonString, SerDeValue.class).getValue(); + + if (type.isListType()) { + Preconditions.checkArgument(jsonStringCollection instanceof List, + "deserialized Json object: (%s) is not List for List type", jsonStringCollection); + List list = new ArrayList<>(); + Type elementType = type.asListType().elementType(); + for (String item : (List) jsonStringCollection) { + list.add(elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(item, elementType) : + JsonUtil.mapper().readValue(item, SerDeValue.class).getValue()); + } + return list; + } + + Preconditions.checkArgument((type.isMapType() || type.isStructType()) && jsonStringCollection instanceof Map, + "deserialized Json object: (%s) is not Map for type: %s", jsonStringCollection, type); + + // map (MapType or StructType) case + Map map = new HashMap<>(); + Map jsonStringMap = (HashMap) jsonStringCollection; + for (Map.Entry entry : jsonStringMap.entrySet()) { + String key = entry.getKey().toString(); + String valueString = entry.getValue().toString(); + Type elementType = type.isMapType() ? type.asMapType().valueType() : type.asStructType().field(key).type(); + Object value = elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(valueString, elementType) + : JsonUtil.mapper().readValue(valueString, SerDeValue.class).getValue(); + map.put(key, value); + } + return map; + } + + private static Object primitiveDefaultValueFromJsonString(String jsonString, Type type) { + switch (type.typeId()) { + case BOOLEAN: + return Boolean.valueOf(jsonString); + case INTEGER: + case DATE: + return Integer.valueOf(jsonString); + case DECIMAL: + return BigDecimal.valueOf(Long.valueOf(jsonString)); + case LONG: + case TIME: + case TIMESTAMP: + return Long.valueOf(jsonString); + case FLOAT: + return Float.valueOf(jsonString); + case DOUBLE: + return Double.valueOf(jsonString); + case STRING: + return jsonString; + case UUID: + return java.util.UUID.fromString(jsonString); + case FIXED: + case BINARY: + return defaultValueFromJsonBytesField(jsonString); + default: + throw new RuntimeException("non-primitive type: " + type); + } + } + + /** + * SerDeValue class: + * This is used so that the value to serialize is specified + * as a property, so that the type information gets included in + * the serialized String. + */ + private static class SerDeValue { + // Name of the field used in the intermediate JSON representation + private static final String VALUE_FIELD = "__value__"; + + @JsonProperty(VALUE_FIELD) + private final Object value; + + @JsonCreator + private SerDeValue(@JsonProperty(VALUE_FIELD) Object value) { + this.value = value; + } + + private Object getValue() { + return value; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 5dfe7d5948..cb4c8dd852 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -152,10 +152,15 @@ public static boolean isOptionalComplexUnion(Schema schema) { } public static Schema toOption(Schema schema) { + return toOption(schema, false); + } + + public static Schema toOption(Schema schema, boolean nullIsSecondElement) { if (schema.getType() == UNION) { - Preconditions.checkArgument(isOptionSchema(schema), - "Union schemas are not supported: %s", schema); + Preconditions.checkArgument(isOptionSchema(schema), "Union schemas are not supported: %s", schema); return schema; + } else if (nullIsSecondElement) { + return Schema.createUnion(schema, NULL); } else { return Schema.createUnion(NULL, schema); } @@ -424,4 +429,12 @@ private static String sanitize(char character) { } return "_x" + Integer.toHexString(character).toUpperCase(); } + + static boolean hasNonNullDefaultValue(Schema.Field field) { + // the schema should use JsonProperties.NULL_VALUE (i.e., null) as the null default + // value, but a user might also use "null" to indicate null while it is actually a String, so + // need to account for it. + return field.hasDefaultValue() && field.defaultVal() != JsonProperties.NULL_VALUE && + !(field.defaultVal() instanceof String && ((String) field.defaultVal()).equalsIgnoreCase("null")); + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index dafaf63c42..aa98d13cb8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -82,28 +82,53 @@ public Schema record(Schema record, List names, Iterable s List updatedFields = Lists.newArrayListWithExpectedSize(struct.fields().size()); List expectedFields = struct.fields(); for (int i = 0; i < expectedFields.size(); i += 1) { - Types.NestedField field = expectedFields.get(i); - + Types.NestedField expectedField = expectedFields.get(i); // detect reordering - if (i < fields.size() && !field.name().equals(fields.get(i).name())) { + if (i < fields.size() && !expectedField.name().equals(fields.get(i).name())) { hasChange = true; } - Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); + Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(expectedField.name())); if (avroField != null) { - updatedFields.add(avroField); - + // if the expectedField has a defaultValue, but the avroField does not, we need to + // create a newField to copy over the non-null default value + if (expectedField.hasDefaultValue() && !AvroSchemaUtil.hasNonNullDefaultValue(avroField)) { + Schema newFiledSchema = (expectedField.isOptional()) ? + AvroSchemaUtil.toOption(AvroSchemaUtil.convert(expectedField.type()), true) : + AvroSchemaUtil.convert(expectedField.type()); + Schema.Field newField = + new Schema.Field(avroField.name(), newFiledSchema, avroField.doc(), expectedField.getDefaultValue()); + newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, expectedField.fieldId()); + updatedFields.add(newField); + } else { + // otherwise (i.e., expectedFiled has no default value, or it is null) we can use avroField as is + updatedFields.add(avroField); + } } else { - Preconditions.checkArgument( - field.isOptional() || field.fieldId() == MetadataColumns.ROW_POSITION.fieldId(), - "Missing required field: %s", field.name()); - // Create a field that will be defaulted to null. We assign a unique suffix to the field - // to make sure that even if records in the file have the field it is not projected. - Schema.Field newField = new Schema.Field( - field.name() + "_r" + field.fieldId(), - AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE); - newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId()); + // here the expectedField is missing from the file schema, so we verify it is either + // an optional field, a metadata column or one that has default value + Preconditions.checkArgument(expectedField.isOptional() || + expectedField.fieldId() == MetadataColumns.ROW_POSITION.fieldId() || expectedField.hasDefaultValue(), + "Missing required field that has no default value: expectedField: %s, avroField: null, record: %s", + expectedField, record); + + // Create a field that will be defaulted to the expectedField's default value. If no default value, + // then default to null and assign a unique suffix to the field to make sure that even if records in the + // file have the field it is not projected. + String newFieldName = expectedField.name(); + Schema newFiledSchema; + Object defaultValue; + if (expectedField.hasDefaultValue()) { + newFiledSchema = AvroSchemaUtil.convert(expectedField.type()); + defaultValue = expectedField.getDefaultValue(); + } else { + newFieldName = newFieldName + "_r" + expectedField.fieldId(); + newFiledSchema = AvroSchemaUtil.toOption(AvroSchemaUtil.convert(expectedField.type())); + defaultValue = JsonProperties.NULL_VALUE; + } + Schema.Field newField = new Schema.Field(newFieldName, newFiledSchema, null, defaultValue); + newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, expectedField.fieldId()); updatedFields.add(newField); hasChange = true; } @@ -153,10 +178,10 @@ public Schema union(Schema union, Iterable options) { Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options)); if (nonNullOriginal != nonNullResult) { - return AvroSchemaUtil.toOption(nonNullResult); + boolean nullIsSecondOption = union.getTypes().get(1).getType() == Schema.Type.NULL; + return AvroSchemaUtil.toOption(nonNullResult, nullIsSecondOption); } } - return union; } diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 6cbcedd1df..a0eb8721b6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -260,16 +260,22 @@ private static Schema copyRecord(Schema record, List newFields) { private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { Schema newSchemaReordered; - // if the newSchema is an optional schema, make sure the NULL option is always the first - if (isOptionSchemaWithNonNullFirstOption(newSchema)) { + // if the newSchema is an optional schema with no, or null, default value, then make sure the + // NULL option is the first + boolean hasNonNullDefaultValue = AvroSchemaUtil.hasNonNullDefaultValue(field); + if (isOptionSchemaWithNonNullFirstOption(newSchema) && !hasNonNullDefaultValue) { newSchemaReordered = AvroSchemaUtil.toOption(AvroSchemaUtil.fromOption(newSchema)); + } else if (AvroSchemaUtil.isOptionSchema(newSchema) && hasNonNullDefaultValue) { + // o.w. if the newSchema is an optional that has a non-null default value, then make sure the + // NULL option is the second + newSchemaReordered = AvroSchemaUtil.toOption(AvroSchemaUtil.fromOption(newSchema), true); } else { newSchemaReordered = newSchema; } - // do not copy over default values as the file is expected to have values for fields already in the file schema - Schema.Field copy = new Schema.Field(field.name(), - newSchemaReordered, field.doc(), - AvroSchemaUtil.isOptionSchema(newSchemaReordered) ? JsonProperties.NULL_VALUE : null, field.order()); + // copy over non-null default values + Object defaultValue = hasNonNullDefaultValue ? field.defaultVal() : + (AvroSchemaUtil.isOptionSchema(newSchemaReordered) ? JsonProperties.NULL_VALUE : null); + Schema.Field copy = new Schema.Field(field.name(), newSchemaReordered, field.doc(), defaultValue, field.order()); for (Map.Entry prop : field.getObjectProps().entrySet()) { copy.addProp(prop.getKey(), prop.getValue()); diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 49cd1e45a2..cda7c4a0e7 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -20,7 +20,6 @@ package org.apache.iceberg.avro; import java.util.List; -import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -93,15 +92,12 @@ public Type record(Schema record, List names, List fieldTypes) { Type fieldType = fieldTypes.get(i); int fieldId = getId(field); - Object defaultValue = field.hasDefaultValue() && !(field.defaultVal() instanceof JsonProperties.Null) ? - field.defaultVal() : null; + Object defaultValue = AvroSchemaUtil.hasNonNullDefaultValue(field) ? field.defaultVal() : null; if (AvroSchemaUtil.isOptionSchema(field.schema()) || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) { newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, defaultValue, null)); - } else if (defaultValue != null) { - newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, defaultValue, null)); } else { - newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType)); + newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, defaultValue, null)); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index 9e5255d8a0..28213e0bda 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -121,7 +121,7 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { @Override public Schema field(Types.NestedField field, Schema fieldSchema) { if (field.isOptional()) { - return AvroSchemaUtil.toOption(fieldSchema); + return AvroSchemaUtil.toOption(fieldSchema, field.hasDefaultValue()); } else { return fieldSchema; } diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java b/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java new file mode 100644 index 0000000000..853b9ad2b4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types.NestedField; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.avro.Schema.Type.BOOLEAN; +import static org.apache.avro.Schema.Type.BYTES; +import static org.apache.avro.Schema.Type.DOUBLE; +import static org.apache.avro.Schema.Type.FLOAT; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.avro.Schema.Type.STRING; + + +public class TestSchemaParserForDefaultValues { + + private void assertEqualStructs(org.apache.iceberg.Schema expected, org.apache.iceberg.Schema actual) { + if (expected == null) { + Assert.assertNull(actual); + return; + } + Assert.assertNotNull(actual); + List expectedFields = expected.asStruct().fields(); + List actualFields = actual.asStruct().fields(); + + Assert.assertEquals(expectedFields.size(), actualFields.size()); + + for (int i = 0; i < expectedFields.size(); i++) { + NestedField expectedField = expectedFields.get(i); + NestedField actualField = actualFields.get(i); + Assert.assertEquals(expectedField.fieldId(), actualField.fieldId()); + Assert.assertEquals(expectedField.name(), actualField.name()); + Assert.assertEquals(expectedField.type(), actualField.type()); + Assert.assertEquals(expectedField.doc(), actualField.doc()); + if (expectedField.hasDefaultValue()) { + Assert.assertTrue(actualField.hasDefaultValue()); + switch (expectedField.type().typeId()) { + case BINARY: + case FIXED: + Assert.assertTrue( + Arrays.equals((byte[]) expectedField.getDefaultValue(), (byte[]) actualField.getDefaultValue())); + break; + default: + Assert.assertEquals(expectedField.getDefaultValue(), actualField.getDefaultValue()); + } + } else { + Assert.assertFalse(actualField.hasDefaultValue()); + } + } + } + + private void testToFromJsonPreservingDefaultValues(String[] fieldNames, Schema[] fieldsSchemas, Object[] defaults) { + List fields = new ArrayList<>(); + IntStream.range(0, defaults.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], fieldsSchemas[i], null, defaults[i]))); + + Schema schema = Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema icebergSchemaFromJson = SchemaParser.fromJson(jsonString); + + assertEqualStructs(icebergSchema, icebergSchemaFromJson); + } + + @Test + public void testPrimitiveTypes() { + Boolean defaultBoolean = true; + Integer defaultInt = 1; + Long defaultLong = -1L; + Double defaultDouble = 0.1; + Float defaultFloat = 0.1f; + String defaultString = "default string"; + String defaultBytes = "1111"; + int fixedSize = defaultBytes.getBytes().length; + + String[] fieldNames = { + "booleanField", + "intField", + "longField", + "doubleField", + "floatField", + "stringField", + "binaryField", + "fixedField"}; + + Object[] defaults = { + defaultBoolean, + defaultInt, + defaultLong, + defaultDouble, + defaultFloat, + defaultString, + defaultBytes, + defaultBytes}; + + Schema[] primitives = { + Schema.create(BOOLEAN), + Schema.create(INT), + Schema.create(LONG), + Schema.create(DOUBLE), + Schema.create(FLOAT), + Schema.create(STRING), + Schema.create(BYTES), + Schema.createFixed("md5", null, "namespace", fixedSize)}; + + testToFromJsonPreservingDefaultValues(fieldNames, primitives, defaults); + } + + @Test + public void testLogicalTypes() { + Long longDefault = Long.valueOf(1234556789); + String[] fieldNames = { + "dateField", + "timeField", + "timestampField", + "uuidField", + "decimalField"}; + + Object[] defaults = { + Integer.valueOf(123446), + longDefault, + "randomUUID", + longDefault}; + + Schema dateSchema = Schema.create(INT); + dateSchema.addProp("logicaltype", "date"); + Schema timestampSchema = Schema.create(LONG); + timestampSchema.addProp("logicaltype", "timestamp"); + Schema uuidSchema = Schema.create(STRING); + uuidSchema.addProp("logicaltype", "UUID"); + Schema bigDecimalSchema = Schema.create(LONG); + bigDecimalSchema.addProp("logicaltype", "decimal"); + + Schema[] logicals = { + dateSchema, + timestampSchema, + uuidSchema, + bigDecimalSchema}; + + testToFromJsonPreservingDefaultValues(fieldNames, logicals, defaults); + } + + @Test + public void testNestedTypes() { + String structStringFieldName = "stringFieldOfStruct"; + String structBooleanFieldName = "booleanFieldOfStruct"; + Map defaultStruct = ImmutableMap.of(structStringFieldName, "default string", + structBooleanFieldName, Boolean.TRUE); + List defaultList = Arrays.asList(1, 2); + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + List structFields = ImmutableList.of( + new Schema.Field(structStringFieldName, Schema.create(STRING), null), + new Schema.Field(structBooleanFieldName, Schema.create(BOOLEAN), null)); + + String[] fieldNames = {"structField", "listField", "mapField"}; + Object[] defaults = {defaultStruct, defaultList, defaultMap}; + Schema[] nested = { + Schema.createRecord("name", null, "namespace", false, structFields), + Schema.createArray(Schema.create(INT)), + Schema.createMap(Schema.create(LONG))}; + + testToFromJsonPreservingDefaultValues(fieldNames, nested, defaults); + } + + @Test + public void testOptionalWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String[] fieldNames = {"optionalPrimitive", "optionalNested"}; + Schema[] optionals = { + Schema.createUnion(Schema.create(INT), Schema.create(NULL)), + Schema.createUnion(Schema.createMap(Schema.create(LONG)), Schema.create(NULL))}; + Object[] defaults = {defaultInt, defaultMap}; + + testToFromJsonPreservingDefaultValues(fieldNames, optionals, defaults); + } + + @Test + public void testNestedOfNestedWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String structIntField = "intFieldOfStruct"; + String structMapFieldName = "mapFieldOfStruct"; + List structFields = ImmutableList.of( + new Schema.Field(structIntField, Schema.create(INT), null, defaultInt), + new Schema.Field(structMapFieldName, Schema.createMap(Schema.create(LONG)), null, defaultMap)); + + String[] fieldNames = {"intFieldNoDefault", "structFieldNoDefault"}; + Schema[] topLevelFields = { + Schema.create(INT), + Schema.createRecord("name", null, "namespace", false, structFields)}; + + List fields = new ArrayList<>(); + IntStream.range(0, fieldNames.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], topLevelFields[i], null))); + + Schema schema = org.apache.avro.Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema fromJsonIcebergSchema = SchemaParser.fromJson(jsonString); + Assert.assertEquals(icebergSchema.toString(), fromJsonIcebergSchema.toString()); + } + + @Test + public void testDeepNestedWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String structIntField = "intFieldOfStruct"; + String structMapFieldName = "mapFieldOfStruct"; + List structFields = ImmutableList.of( + new Schema.Field(structIntField, Schema.create(INT), null, defaultInt), + new Schema.Field(structMapFieldName, Schema.createMap(Schema.create(LONG)), null, defaultMap)); + + Schema downLevelStruct = Schema.createRecord("name", null, "namespace0", false, structFields); + + List intermediateStructFields = ImmutableList.of( + new Schema.Field("intermediateIntField", Schema.create(INT), null), + new Schema.Field("intermediateStructField", downLevelStruct, null)); + + Schema intermediateStruct = Schema.createRecord("name", null, "namespace1", false, intermediateStructFields); + String[] fieldNames = {"topLevelLong", "topLevelString", "topLevelStruct"}; + Schema[] topLevelFields = { + Schema.create(LONG), + Schema.create(STRING), + intermediateStruct}; + + List fields = new ArrayList<>(); + IntStream.range(0, fieldNames.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], topLevelFields[i], null))); + + Schema schema = org.apache.avro.Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema fromJsonIcebergSchema = SchemaParser.fromJson(jsonString); + Assert.assertEquals(icebergSchema.toString(), fromJsonIcebergSchema.toString()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index f3e75e1f2a..8d8fbb97f8 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -173,7 +173,7 @@ public void testMissingRequiredFields() { Schema readSchema = writeSchema; AssertHelpers.assertThrows("Missing required field in nameMapping", - IllegalArgumentException.class, "Missing required field: x", + IllegalArgumentException.class, // In this case, pruneColumns result is an empty record () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java index 13efc18a77..575563a088 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -40,6 +41,9 @@ public class TestAvroOptionsWithNonNullDefaults { + private static final String fieldWithDefaultName = "fieldWithDefault"; + private static final String noDefaultFiledName = "noDefaultField"; + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -125,4 +129,116 @@ public void writeAndValidateOptionWithNonNullDefaultsEvolution() throws IOExcept AvroTestHelpers.assertEquals(readIcebergSchema.asStruct(), expected.get(i), rows.get(i)); } } + + @Test + public void testDefaultValueUsedPrimitiveType() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null))); + // evolved schema + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.create(INT), null, -1))); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put(noDefaultFiledName, 1); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put(noDefaultFiledName, 2); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + + List expected = ImmutableList.of(record1, record2); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default should be used for records missing the field + Assert.assertEquals(-1, rows.get(i).get(fieldWithDefaultName)); + } + } + + @Test + public void testDefaultValueNotUsedWhenFiledHasValue() throws IOException { + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.create(INT), null, -1))); + + GenericData.Record record1 = new GenericData.Record(readSchema); + record1.put(noDefaultFiledName, 3); + record1.put(fieldWithDefaultName, 3); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(readSchema, testFile); + writer.append(record1); + } + + List expected = ImmutableList.of(record1); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default value should NOT be used if field is populated + Assert.assertEquals(expected.get(i).get(fieldWithDefaultName), rows.get(i).get(fieldWithDefaultName)); + } + } + + @Test + public void testDefaultValueUsedComplexType() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null))); + // evolved schema + List defaultArray = Arrays.asList(-1, -2); + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.createArray(Schema.create(INT)), null, defaultArray))); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put(noDefaultFiledName, 1); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put(noDefaultFiledName, 2); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + + List expected = ImmutableList.of(record1, record2); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default should be used for records missing the field + Assert.assertEquals(defaultArray, rows.get(i).get(fieldWithDefaultName)); + } + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java new file mode 100644 index 0000000000..388397f0e7 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.iceberg.spark.SparkSchemaUtil.convert; + +public class TestSparkAvroReaderForFieldsWithDefaultValue { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testAvroDefaultValues() throws IOException { + String indexFiledName = "index"; + String nullableFiledName = "optionalFieldWithDefault"; + String requiredFiledName = "requiredFieldWithDefault"; + int defaultValue = -1; + + // write records with initial writeSchema + org.apache.avro.Schema writeSchema = org.apache.avro.Schema.createRecord("root", null, null, false, + ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), + null, null), new org.apache.avro.Schema.Field(nullableFiledName, + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), + org.apache.avro.Schema.create(NULL)), null, defaultValue))); + + Schema icebergWriteSchema = AvroSchemaUtil.toIceberg(writeSchema); + List expected = RandomData.generateList(icebergWriteSchema, 2, 0L); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Avro.write(Files.localOutput(testFile)) + .schema(icebergWriteSchema) + .named("test") + .build()) { + for (GenericData.Record rec : expected) { + writer.add(rec); + } + } + + // evolve schema by adding a required field with default value + org.apache.avro.Schema evolvedSchema = org.apache.avro.Schema.createRecord("root", null, null, false, + ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), + null, null), + new org.apache.avro.Schema.Field(nullableFiledName, + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), + org.apache.avro.Schema.create(NULL)), null, defaultValue), + new org.apache.avro.Schema.Field(requiredFiledName, org.apache.avro.Schema.create(INT), null, defaultValue) + )); + + // read written rows with evolved schema + List rows; + Schema icebergReadSchema = AvroSchemaUtil.toIceberg(evolvedSchema); + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(icebergReadSchema) + .build()) { + rows = Lists.newArrayList(reader); + } + + // validate all rows, and all fields are read properly + Assert.assertNotNull(rows); + Assert.assertEquals(expected.size(), rows.size()); + for (int row = 0; row < expected.size(); row++) { + GenericData.Record expectedRow = expected.get(row); + InternalRow actualRow = rows.get(row); + List fields = icebergReadSchema.asStruct().fields(); + + for (int i = 0; i < fields.size(); i += 1) { + Object expectedValue = null; + if (i >= writeSchema.getFields().size() && fields.get(i).hasDefaultValue()) { + expectedValue = fields.get(i).getDefaultValue(); + } else if (i < writeSchema.getFields().size()) { + expectedValue = expectedRow.get(i); + } + Type fieldType = fields.get(i).type(); + Object actualValue = actualRow.isNullAt(i) ? null : actualRow.get(i, convert(fieldType)); + Assert.assertEquals(expectedValue, actualValue); + } + } + } +} +