diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 053529a21c47..6c9659fdcd34 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -406,6 +406,10 @@ acceptedBreaks: justification: "Removing deprecations for 1.2.0" "1.2.0": org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.types.Types.NestedField" + new: "class org.apache.iceberg.types.Types.NestedField" + justification: "Serialization across versions is not supported" - code: "java.field.constantValueChanged" old: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT" new: "field org.apache.iceberg.actions.RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT" diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index daf2e6bbc0ca..52b01c71e1b6 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -68,11 +68,21 @@ public Type struct(Types.StructType struct, List fieldResults) { if (field.isOptional()) { selectedFields.add( Types.NestedField.optional( - field.fieldId(), field.name(), projectedType, field.doc())); + field.fieldId(), + field.name(), + projectedType, + field.doc(), + field.initialDefault(), + field.writeDefault())); } else { selectedFields.add( Types.NestedField.required( - field.fieldId(), field.name(), projectedType, field.doc())); + field.fieldId(), + field.name(), + projectedType, + field.doc(), + field.initialDefault(), + field.writeDefault())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index da70dd9ac6ab..9fcb9c73a222 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -413,27 +413,48 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, null, null); + } + + public static NestedField optional( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, null, null); + } + + public static NestedField required( + int id, String name, Type type, String doc, Object initialDefault, Object writeDefault) { + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, doc, null, null); + } + + public static NestedField of( + int id, + boolean isOptional, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); } private final boolean isOptional; @@ -441,8 +462,17 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final String name; private final Type type; private final String doc; - - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private final Object initialDefault; + private final Object writeDefault; + + public NestedField( + boolean isOptional, + int id, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -450,6 +480,8 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d this.name = name; this.type = type; this.doc = doc; + this.initialDefault = initialDefault; + this.writeDefault = writeDefault; } public boolean isOptional() { @@ -460,7 +492,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public boolean isRequired() { @@ -471,7 +503,11 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); + } + + public NestedField withWriteDefault(Object newWriteDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault); } public int fieldId() { @@ -490,6 +526,14 @@ public String doc() { return doc; } + public Object initialDefault() { + return initialDefault; + } + + public Object writeDefault() { + return writeDefault; + } + @Override public String toString() { return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type) diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index c5c78dd1472a..c731ee1d1b11 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -103,9 +103,17 @@ public Schema record(Schema record, List names, Iterable s } else { Preconditions.checkArgument( - field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), + (field.isRequired() && field.initialDefault() != null) + || field.isOptional() + || MetadataColumns.metadataFieldIds().contains(field.fieldId()), "Missing required field: %s", field.name()); + // If the field has an initial default value, do not project it in the read schema. + // The default will be added in {@link org.apache.iceberg.avro.ValueReader}. + if (field.initialDefault() != null) { + continue; + } + // Create a field that will be defaulted to null. We assign a unique suffix to the field // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 19789cce82fc..65419b3dcc9b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import org.apache.avro.Schema; @@ -41,8 +42,10 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -566,8 +569,10 @@ public Map read(Decoder decoder, Object reuse) throws IOException { public abstract static class StructReader implements ValueReader, SupportsRowPosition { private final ValueReader[] readers; - private final int[] positions; - private final Object[] constants; + private final int[] constantPositions; + private final Object[] constantValues; + private final int[] defaultPositions; + private final Object[] defaultValues; private int posField = -1; protected StructReader(List> readers, Schema schema) { @@ -586,37 +591,68 @@ protected StructReader(List> readers, Schema schema) { } if (isDeletedColumnPos == null) { - this.positions = new int[0]; - this.constants = new Object[0]; + this.constantPositions = new int[0]; + this.constantValues = new Object[0]; } else { - this.positions = new int[] {isDeletedColumnPos}; - this.constants = new Object[] {false}; + this.constantPositions = new int[] {isDeletedColumnPos}; + this.constantValues = new Object[] {false}; } + this.defaultPositions = new int[0]; + this.defaultValues = new Object[0]; } protected StructReader( List> readers, Types.StructType struct, Map idToConstant) { + this(readers, struct, idToConstant, ImmutableMap.of()); + } + + protected StructReader( + List> readers, + Types.StructType struct, + Map idToConstant, + Map idToDefault) { this.readers = readers.toArray(new ValueReader[0]); List fields = struct.fields(); - List positionList = Lists.newArrayListWithCapacity(fields.size()); - List constantList = Lists.newArrayListWithCapacity(fields.size()); + List constantPositionsList = Lists.newArrayListWithCapacity(fields.size()); + List constantValuesList = Lists.newArrayListWithCapacity(fields.size()); + List defaultPositionList = Lists.newArrayListWithCapacity(fields.size()); + List defaultValuesList = Lists.newArrayListWithCapacity(fields.size()); for (int pos = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); if (idToConstant.containsKey(field.fieldId())) { - positionList.add(pos); - constantList.add(idToConstant.get(field.fieldId())); + constantPositionsList.add(pos); + constantValuesList.add(idToConstant.get(field.fieldId())); } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { - positionList.add(pos); - constantList.add(false); + constantPositionsList.add(pos); + constantValuesList.add(false); + } else if (field.initialDefault() != null) { + if (idToDefault.containsKey(field.fieldId())) { + // Add a constant value for fields that have a default value. + // In the {@link #read()} method, this will be leveraged only if there is no + // corresponding + // reader. + defaultPositionList.add(pos); + defaultValuesList.add(idToDefault.get(field.fieldId())); + } else { + // Throw an exception if the map does not contain a default value for that field. + throw new UnsupportedOperationException( + "Default value not found for field: " + + field.name() + + " (field ID: " + + field.fieldId() + + ")."); + } } } - this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); - this.constants = constantList.toArray(); + this.constantPositions = constantPositionsList.stream().mapToInt(Integer::intValue).toArray(); + this.constantValues = constantValuesList.toArray(); + this.defaultPositions = defaultPositionList.stream().mapToInt(Integer::intValue).toArray(); + this.defaultValues = defaultValuesList.toArray(); } @Override @@ -654,10 +690,20 @@ public S read(Decoder decoder, Object reuse) throws IOException { S struct = reuseOrCreate(reuse); if (decoder instanceof ResolvingDecoder) { - // this may not set all of the fields. nulls are set by default. + Set existingFieldPositionsSet = Sets.newHashSet(); + // This may not set all of the fields. nulls are set by default. for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { Object reusedValue = get(struct, field.pos()); set(struct, field.pos(), readers[field.pos()].read(decoder, reusedValue)); + existingFieldPositionsSet.add(field.pos()); + } + + // Set default values + for (int i = 0; i < defaultPositions.length; i += 1) { + // Set default values only if the field does not exist in the data. + if (!existingFieldPositionsSet.contains(defaultPositions[i])) { + set(struct, defaultPositions[i], defaultValues[i]); + } } } else { @@ -667,8 +713,8 @@ public S read(Decoder decoder, Object reuse) throws IOException { } } - for (int i = 0; i < positions.length; i += 1) { - set(struct, positions[i], constants[i]); + for (int i = 0; i < constantPositions.length; i += 1) { + set(struct, constantPositions[i], constantValues[i]); } return struct; diff --git a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java index 4cb41263152d..c19d75f0003f 100644 --- a/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java +++ b/core/src/main/java/org/apache/iceberg/data/IdentityPartitionConverters.java @@ -18,9 +18,11 @@ */ package org.apache.iceberg.data; +import java.nio.ByteBuffer; import org.apache.avro.generic.GenericData; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; public class IdentityPartitionConverters { @@ -46,7 +48,9 @@ public static Object convertConstant(Type type, Object value) { return DateTimeUtil.timestampFromMicros((Long) value); } case FIXED: - if (value instanceof GenericData.Fixed) { + if (value instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } else if (value instanceof GenericData.Fixed) { return ((GenericData.Fixed) value).bytes(); } return value; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 91a728d53d38..d77a9de50f60 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -29,7 +29,10 @@ import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.DateTimeUtil; @@ -106,7 +109,7 @@ private static class GenericRecordReader extends ValueReaders.StructReader> readers, StructType struct, Map idToConstant) { - super(readers, struct, idToConstant); + super(readers, struct, idToConstant, idToDefault(struct)); this.structType = struct; } @@ -128,5 +131,17 @@ protected Object get(Record struct, int pos) { protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } + + private static Map idToDefault(StructType struct) { + Map result = Maps.newHashMap(); + for (Types.NestedField field : struct.fields()) { + if (field.initialDefault() != null) { + result.put( + field.fieldId(), + IdentityPartitionConverters.convertConstant(field.type(), field.initialDefault())); + } + } + return result; + } } } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java new file mode 100644 index 000000000000..8b7cee9bcba6 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestReadDefaultValues.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SingleValueParser; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.data.DataTestHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IdentityPartitionConverters; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestReadDefaultValues { + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private static final Object[][] typesWithDefaults = + new Object[][] { + {Types.BooleanType.get(), "true"}, + {Types.IntegerType.get(), "1"}, + {Types.LongType.get(), "9999999"}, + {Types.FloatType.get(), "1.23"}, + {Types.DoubleType.get(), "123.456"}, + {Types.DateType.get(), "\"2007-12-03\""}, + {Types.TimeType.get(), "\"10:15:30\""}, + {Types.TimestampType.withoutZone(), "\"2007-12-03T10:15:30\""}, + {Types.TimestampType.withZone(), "\"2007-12-03T10:15:30+00:00\""}, + {Types.StringType.get(), "\"foo\""}, + {Types.UUIDType.get(), "\"eb26bdb1-a1d8-4aa6-990e-da940875492c\""}, + {Types.FixedType.ofLength(2), "\"111f\""}, + {Types.BinaryType.get(), "\"0000ff\""}, + {Types.DecimalType.of(9, 4), "\"123.4500\""}, + {Types.DecimalType.of(9, 0), "\"2\""}, + // Avro doesn't support negative scale + // {Types.DecimalType.of(9, -20), "\"2E+20\""}, + {Types.ListType.ofOptional(1, Types.IntegerType.get()), "[1, 2, 3]"}, + { + Types.MapType.ofOptional(2, 3, Types.IntegerType.get(), Types.StringType.get()), + "{\"keys\": [1, 2], \"values\": [\"foo\", \"bar\"]}" + }, + { + Types.StructType.of( + required(4, "f1", Types.IntegerType.get()), + optional(5, "f2", Types.StringType.get())), + "{\"4\": 1, \"5\": \"bar\"}" + }, + // deeply nested complex types + { + Types.ListType.ofOptional( + 6, + Types.StructType.of( + required(7, "f1", Types.IntegerType.get()), + optional(8, "f2", Types.StringType.get()))), + "[{\"7\": 1, \"8\": \"bar\"}, {\"7\": 2, \"8\": " + "\"foo\"}]" + }, + { + Types.MapType.ofOptional( + 9, + 10, + Types.IntegerType.get(), + Types.StructType.of( + required(11, "f1", Types.IntegerType.get()), + optional(12, "f2", Types.StringType.get()))), + "{\"keys\": [1, 2], \"values\": [{\"11\": 1, \"12\": \"bar\"}, {\"11\": 2, \"12\": \"foo\"}]}" + }, + { + Types.StructType.of( + required( + 13, + "f1", + Types.StructType.of( + optional(14, "ff1", Types.IntegerType.get()), + optional(15, "ff2", Types.StringType.get()))), + optional( + 16, + "f2", + Types.StructType.of( + optional(17, "ff1", Types.StringType.get()), + optional(18, "ff2", Types.IntegerType.get())))), + "{\"13\": {\"14\": 1, \"15\": \"bar\"}, \"16\": {\"17\": \"bar\", \"18\": 1}}" + }, + }; + + @Test + public void testDefaultValueApplied() throws IOException { + for (Object[] typeWithDefault : typesWithDefaults) { + Type type = (Type) typeWithDefault[0]; + String defaultValueJson = (String) typeWithDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + Schema writerSchema = new Schema(required(999, "col1", Types.IntegerType.get())); + + Schema readerSchema = + new Schema( + required(999, "col1", Types.IntegerType.get()), + Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue)); + + Record expectedRecord = GenericRecord.create(readerSchema); + expectedRecord.set(0, 1); + expectedRecord.set(1, IdentityPartitionConverters.convertConstant(type, defaultValue)); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(writerSchema) + .createWriterFunc(DataWriter::create) + .named("test") + .build()) { + Record record = GenericRecord.create(writerSchema); + record.set(0, 1); + writer.add(record); + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(DataReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + DataTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0)); + } + } + + @Test + public void testDefaultValueNotApplied() throws IOException { + for (Object[] typeWithDefault : typesWithDefaults) { + Type type = (Type) typeWithDefault[0]; + String defaultValueJson = (String) typeWithDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + Schema readerSchema = + new Schema( + required(999, "col1", Types.IntegerType.get()), + Types.NestedField.optional(1000, "col2", type, null, defaultValue, defaultValue)); + + // Create a record with null value for the column with default value + Record record = GenericRecord.create(readerSchema); + record.set(0, 1); + record.set(1, null); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(readerSchema) + .createWriterFunc(DataWriter::create) + .named("test") + .build()) { + writer.add(record); + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(DataReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + // Existence of default value should not affect the read result + DataTestHelpers.assertEquals(readerSchema.asStruct(), record, rows.get(0)); + } + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 32f6c3a2ccfd..272813e937ba 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -287,6 +287,7 @@ private static class StructReader extends ValueReaders.StructReader { private StructReader( List> readers, Types.StructType struct, Map idToConstant) { + // TODO: Support passing default value map. super(readers, struct, idToConstant); this.numFields = readers.size(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 3cbf38d88bf4..13c2e11e39eb 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -254,6 +254,7 @@ static class StructReader extends ValueReaders.StructReader { protected StructReader( List> readers, Types.StructType struct, Map idToConstant) { + // TODO: Support passing default value map super(readers, struct, idToConstant); this.numFields = readers.size(); }