diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 1b050725afd0..b4d43d8b577b 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -415,27 +415,42 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, null, null); + } + + public static NestedField optional(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, null, null); + } + + public static NestedField required(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, doc, null, null); + } + + public static NestedField of(int id, boolean isOptional, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); } private final boolean isOptional; @@ -443,8 +458,12 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final String name; private final Type type; private final String doc; + private final Object initialDefault; + private final Object writeDefault; + - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private NestedField(boolean isOptional, int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -452,6 +471,8 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d this.name = name; this.type = type; this.doc = doc; + this.initialDefault = initialDefault; + this.writeDefault = writeDefault; } public boolean isOptional() { @@ -462,7 +483,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public boolean isRequired() { @@ -473,7 +494,11 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); + } + + public NestedField updateWriteDefault(Object newWriteDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault); } public int fieldId() { @@ -492,6 +517,14 @@ public String doc() { return doc; } + public Object initialDefaultValue() { + return initialDefault; + } + + public Object writeDefaultValue() { + return writeDefault; + } + @Override public String toString() { return String.format("%d: %s: %s %s", diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 71faf3bc1ce6..279ad14690bc 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -49,6 +49,8 @@ private AvroSchemaUtil() { public static final String ELEMENT_ID_PROP = "element-id"; public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc"; + public static final String SHOULD_USE_INIT_DEFAULT = "_default_exist"; + private static final Schema NULL = Schema.create(Schema.Type.NULL); private static final Schema.Type MAP = Schema.Type.MAP; private static final Schema.Type ARRAY = Schema.Type.ARRAY; diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index e6f1c6eb5097..1fc7bb7b6350 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -35,7 +35,7 @@ public static T visit(org.apache.iceberg.Schema iSchema, Schema schema, Avro public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor visitor) { switch (schema.getType()) { case RECORD: - return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); + return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema); case UNION: return visitUnion(iType, schema, visitor); @@ -53,13 +53,13 @@ public static T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor T visitRecord(Types.StructType struct, Schema record, AvroSchemaWithTypeVisitor visitor) { + protected T visitRecord(Types.StructType struct, Schema record) { // check to make sure this hasn't been visited before String name = record.getFullName(); - Preconditions.checkState(!visitor.recordLevels.contains(name), + Preconditions.checkState(!recordLevels.contains(name), "Cannot process recursive Avro record %s", name); - visitor.recordLevels.push(name); + recordLevels.push(name); List fields = record.getFields(); List names = Lists.newArrayListWithExpectedSize(fields.size()); @@ -68,12 +68,12 @@ private static T visitRecord(Types.StructType struct, Schema record, AvroSch int fieldId = AvroSchemaUtil.getFieldId(field); Types.NestedField iField = struct != null ? struct.field(fieldId) : null; names.add(field.name()); - results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor)); + results.add(visit(iField != null ? iField.type() : null, field.schema(), this)); } - visitor.recordLevels.pop(); + recordLevels.pop(); - return visitor.record(struct, record, names, results); + return record(struct, record, names, results); } private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor visitor) { @@ -107,7 +107,7 @@ private static T visitArray(Type type, Schema array, AvroSchemaWithTypeVisit } } - private Deque recordLevels = Lists.newLinkedList(); + protected Deque recordLevels = Lists.newLinkedList(); public T record(Types.StructType iStruct, Schema record, List names, List fields) { return null; diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index f708f556fff9..9bedd3ccee53 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -99,16 +99,25 @@ public Schema record(Schema record, List names, Iterable s if (avroField != null) { updatedFields.add(avroField); - } else { Preconditions.checkArgument( - field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), + (field.isRequired() && field.initialDefaultValue() != null) || + field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), "Missing required field: %s", field.name()); // Create a field that will be defaulted to null. We assign a unique suffix to the field // to make sure that even if records in the file have the field it is not projected. Schema.Field newField = new Schema.Field( field.name() + "_r" + field.fieldId(), AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE); + // If the field from Iceberg schema has initial default value, we give a special + // mark to this created avro field, so that in the later stage, the reader can identify + // this field and use a constant reader to read the field, rather than returning null + // as delegated from avro file reader + if (field.initialDefaultValue() != null) { + newField.addProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT, true); + } else { + newField.addProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT, false); + } newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId()); updatedFields.add(newField); hasChange = true; @@ -146,7 +155,6 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { // always copy because fields can't be reused return AvroSchemaUtil.copyField(field, field.schema(), field.name()); } - } finally { this.current = struct; } @@ -191,11 +199,9 @@ public Schema array(Schema array, Supplier element) { } return array; - } finally { this.current = asMapType; } - } else { Preconditions.checkArgument(current.isListType(), "Incompatible projected type: %s", current); @@ -210,7 +216,6 @@ public Schema array(Schema array, Supplier element) { } return array; - } finally { this.current = list; } @@ -234,7 +239,6 @@ public Schema map(Schema map, Supplier value) { } return map; - } finally { this.current = asMapType; } @@ -260,5 +264,4 @@ public Schema primitive(Schema primitive) { return primitive; } } - } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 4356f6b48c87..2ded96a4b017 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -267,6 +267,12 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo case STRUCT: orcType = TypeDescription.createStruct(); for (Types.NestedField nestedField : type.asStructType().fields()) { + // When we have a new evolved field in Iceberg schema which has an initial default value, + // but the underlying orc file lacks that field, we ignore projecting this field to the orc + // file reader schema, but instead populate this field inside Iceberg using a ConstantReader + if (mapping.get(nestedField.fieldId()) == null && nestedField.initialDefaultValue() != null) { + continue; + } // Using suffix _r to avoid potential underlying issues in ORC reader // with reused column names between ORC and Iceberg; // e.g. renaming column c -> d and adding new column d diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 53b0c9f2fdeb..d255d9b0cd98 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -35,7 +35,7 @@ public static T visit( public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { switch (schema.getCategory()) { case STRUCT: - return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); + return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: throw new UnsupportedOperationException("Cannot handle " + schema); @@ -58,7 +58,7 @@ public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV } } - private static T visitRecord( + public T visitRecord( Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor visitor) { List fields = record.getChildren(); List names = record.getFieldNames(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index c693e2e2c057..acb2238c557a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -73,17 +73,16 @@ public void setRowPositionSupplier(Supplier posSupplier) { } } - private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { - private final Map idToConstant; + private static class ReadBuilder extends SparkDefaultValueAwareAvroSchemaWithTypeVisitor> { private ReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; + super(idToConstant); } @Override public ValueReader record(Types.StructType expected, Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields, expected, idToConstant); + return SparkValueReaders.struct(fields, expected, getIdToConstant()); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareAvroSchemaWithTypeVisitor.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareAvroSchemaWithTypeVisitor.java new file mode 100644 index 000000000000..795d1a5559d8 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareAvroSchemaWithTypeVisitor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.BaseDataReader; +import org.apache.iceberg.types.Types; + +public abstract class SparkDefaultValueAwareAvroSchemaWithTypeVisitor extends AvroSchemaWithTypeVisitor { + + private final Map idToConstant; + + protected Map getIdToConstant() { + return idToConstant; + } + + protected SparkDefaultValueAwareAvroSchemaWithTypeVisitor(Map idToConstant) { + this.idToConstant = Maps.newHashMap(); + this.idToConstant.putAll(idToConstant); + } + + @Override + public T visitRecord(Types.StructType struct, Schema record) { + // check to make sure this hasn't been visited before + String name = record.getFullName(); + Preconditions.checkState(!recordLevels.contains(name), + "Cannot process recursive Avro record %s", name); + + recordLevels.push(name); + + List fields = record.getFields(); + List names = Lists.newArrayListWithExpectedSize(fields.size()); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Schema.Field field : fields) { + int fieldId = AvroSchemaUtil.getFieldId(field); + Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + Object shouldUseDefaultFlag = field.getObjectProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT); + if (iField != null && shouldUseDefaultFlag != null && (Boolean) shouldUseDefaultFlag) { + idToConstant.put( + fieldId, + BaseDataReader.convertConstant(iField.type(), iField.initialDefaultValue())); + } + names.add(field.name()); + results.add(visit(iField != null ? iField.type() : null, field.schema(), this)); + } + + recordLevels.pop(); + + return record(struct, record, names, results); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareOrcSchemaWithTypeVisitor.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareOrcSchemaWithTypeVisitor.java new file mode 100644 index 000000000000..3dd1f4fa3443 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkDefaultValueAwareOrcSchemaWithTypeVisitor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.source.BaseDataReader; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; + +public abstract class SparkDefaultValueAwareOrcSchemaWithTypeVisitor extends OrcSchemaWithTypeVisitor { + + private final Map idToConstant; + + protected Map getIdToConstant() { + return idToConstant; + } + + protected SparkDefaultValueAwareOrcSchemaWithTypeVisitor(Map idToConstant) { + this.idToConstant = Maps.newHashMap(); + this.idToConstant.putAll(idToConstant); + } + + /** + * This visitor differs from the parent visitor on how it handles Struct type visiting, + * particularly, it assumes the iceberg schema represented by {@code struct} might have + * additional fields than the ORCSchemaUtil.buildOrcProjection projected orc file reader + * schema {@code record}, at this point, we know for sure those additional fields have + * initial default values, and we build an idToConstant map here to supply to ORC value + * readers. + */ + @Override + public T visitRecord( + Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor visitor) { + + List iFields = struct.fields(); + List fields = record.getChildren(); + List names = record.getFieldNames(); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + + for (int i = 0, j = 0; i < iFields.size(); i++) { + Types.NestedField iField = iFields.get(i); + TypeDescription field = j < fields.size() ? fields.get(j) : null; + if (field == null || (iField.fieldId() != ORCSchemaUtil.fieldId(field))) { + if (!MetadataColumns.isMetadataColumn(iField.name()) && !idToConstant.containsKey(iField.fieldId())) { + idToConstant.put( + iField.fieldId(), + BaseDataReader.convertConstant(iField.type(), iField.initialDefaultValue())); + } + } else { + results.add(visit(iField.type(), field, visitor)); + j++; + } + } + return visitor.record(struct, record, names, results); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 4ed6420a9aa4..24776406a63f 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -62,17 +62,16 @@ public void setBatchContext(long batchOffsetInFile) { reader.setBatchContext(batchOffsetInFile); } - private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { - private final Map idToConstant; + private static class ReadBuilder extends SparkDefaultValueAwareOrcSchemaWithTypeVisitor> { private ReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; + super(idToConstant); } @Override public OrcValueReader record( Types.StructType expected, TypeDescription record, List names, List> fields) { - return SparkOrcValueReaders.struct(fields, expected, idToConstant); + return SparkOrcValueReaders.struct(fields, expected, getIdToConstant()); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantArrayColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantArrayColumnVector.java new file mode 100644 index 000000000000..cb5706d8157e --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantArrayColumnVector.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vectorized; + +import java.util.Arrays; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +public class ConstantArrayColumnVector extends ConstantColumnVector { + + private final Object[] constantArray; + + public ConstantArrayColumnVector(DataType type, int batchSize, Object[] constantArray) { + super(type, batchSize, constantArray); + this.constantArray = constantArray; + } + + @Override + public boolean getBoolean(int rowId) { + return (boolean) constantArray[rowId]; + } + + @Override + public byte getByte(int rowId) { + return (byte) constantArray[rowId]; + } + + @Override + public short getShort(int rowId) { + return (short) constantArray[rowId]; + } + + @Override + public int getInt(int rowId) { + return (int) constantArray[rowId]; + } + + @Override + public long getLong(int rowId) { + return (long) constantArray[rowId]; + } + + @Override + public float getFloat(int rowId) { + return (float) constantArray[rowId]; + } + + @Override + public double getDouble(int rowId) { + return (double) constantArray[rowId]; + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + return (Decimal) constantArray[rowId]; + } + + @Override + public UTF8String getUTF8String(int rowId) { + return (UTF8String) constantArray[rowId]; + } + + @Override + public byte[] getBinary(int rowId) { + return (byte[]) constantArray[rowId]; + } + + @Override + public ColumnarArray getArray(int rowId) { + return new ColumnarArray( + new ConstantArrayColumnVector(((ArrayType) type).elementType(), getBatchSize(), + ((ArrayData) constantArray[rowId]).array()), + 0, + ((ArrayData) constantArray[rowId]).numElements()); + } + + @Override + public ColumnarMap getMap(int rowId) { + ColumnVector keys = new ConstantArrayColumnVector(((MapType) type).keyType(), getBatchSize(), + ((MapData) constantArray[rowId]).keyArray().array()); + ColumnVector values = new ConstantArrayColumnVector(((MapType) type).valueType(), getBatchSize(), + ((MapData) constantArray[rowId]).valueArray().array()); + return new ColumnarMap(keys, values, 0, ((MapData) constantArray[rowId]).numElements()); + } + + @Override + public ColumnVector getChild(int ordinal) { + DataType fieldType = ((StructType) type).fields()[ordinal].dataType(); + return new ConstantArrayColumnVector(fieldType, getBatchSize(), + Arrays.stream(constantArray).map(row -> ((InternalRow) row).get(ordinal, fieldType)).toArray()); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantColumnVector.java index 3cdea65b2877..c8f3a0bc59ef 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantColumnVector.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ConstantColumnVector.java @@ -21,7 +21,14 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; @@ -38,6 +45,16 @@ class ConstantColumnVector extends ColumnVector { this.batchSize = batchSize; } + ConstantColumnVector(DataType type, int batchSize, Object constant) { + super(type); + this.constant = constant; + this.batchSize = batchSize; + } + + protected int getBatchSize() { + return batchSize; + } + @Override public void close() { } @@ -94,12 +111,20 @@ public double getDouble(int rowId) { @Override public ColumnarArray getArray(int rowId) { - throw new UnsupportedOperationException("ConstantColumnVector only supports primitives"); + return new ColumnarArray( + new ConstantArrayColumnVector(((ArrayType) type).elementType(), batchSize, + ((ArrayData) constant).array()), + 0, + ((ArrayData) constant).numElements()); } @Override public ColumnarMap getMap(int ordinal) { - throw new UnsupportedOperationException("ConstantColumnVector only supports primitives"); + ColumnVector keys = new ConstantArrayColumnVector(((MapType) type).keyType(), batchSize, + ((MapData) constant).keyArray().array()); + ColumnVector values = new ConstantArrayColumnVector(((MapType) type).valueType(), batchSize, + ((MapData) constant).valueArray().array()); + return new ColumnarMap(keys, values, 0, ((MapData) constant).numElements()); } @Override @@ -119,6 +144,8 @@ public byte[] getBinary(int rowId) { @Override public ColumnVector getChild(int ordinal) { - throw new UnsupportedOperationException("ConstantColumnVector only supports primitives"); + DataType fieldType = ((StructType) type).fields()[ordinal].dataType(); + return new ConstantColumnVector(fieldType, batchSize, + ((InternalRow) constant).get(ordinal, fieldType)); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 418c25993a7e..5173beba69b2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -30,6 +30,7 @@ import org.apache.iceberg.orc.OrcValueReaders; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.SparkDefaultValueAwareOrcSchemaWithTypeVisitor; import org.apache.iceberg.spark.data.SparkOrcValueReaders; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -80,17 +81,15 @@ ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVe long batchOffsetInFile); } - private static class ReadBuilder extends OrcSchemaWithTypeVisitor { - private final Map idToConstant; - + private static class ReadBuilder extends SparkDefaultValueAwareOrcSchemaWithTypeVisitor { private ReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; + super(idToConstant); } @Override public Converter record(Types.StructType iStruct, TypeDescription record, List names, List fields) { - return new StructConverter(iStruct, fields, idToConstant); + return new StructConverter(iStruct, fields, getIdToConstant()); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index f0664c7e8e29..cd57f7cd4f50 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -42,6 +42,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types.NestedField; @@ -50,6 +51,8 @@ import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; @@ -60,7 +63,7 @@ * * @param is the Java class returned by this reader whose objects contain one or more rows. */ -abstract class BaseDataReader implements Closeable { +public abstract class BaseDataReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class); private final Table table; @@ -155,7 +158,7 @@ protected InputFile getInputFile(String location) { } } - protected static Object convertConstant(Type type, Object value) { + public static Object convertConstant(Type type, Object value) { if (value == null) { return null; } @@ -187,15 +190,39 @@ protected static Object convertConstant(Type type, Object value) { List fields = structType.fields(); Object[] values = new Object[fields.size()]; - StructLike struct = (StructLike) value; - for (int index = 0; index < fields.size(); index++) { - NestedField field = fields.get(index); - Type fieldType = field.type(); - values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass())); + if (value instanceof StructLike) { + StructLike struct = (StructLike) value; + + for (int index = 0; index < fields.size(); index++) { + NestedField field = fields.get(index); + Type fieldType = field.type(); + values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass())); + } + } else if (value instanceof Map) { + // This branch handles Map-based default value for StructType + Map map = (Map) value; + for (int index = 0; index < fields.size(); index++) { + NestedField field = fields.get(index); + Integer fieldId = field.fieldId(); + values[index] = convertConstant(field.type(), map.get(fieldId)); + } } return new GenericInternalRow(values); + case LIST: + return new GenericArrayData(((List) value).stream() + .map(e -> convertConstant(type.asListType().elementType(), e)).toArray()); + case MAP: + List keyList = Lists.newArrayList(); + List valueList = Lists.newArrayList(); + for (Map.Entry entry : ((Map) value).entrySet()) { + keyList.add(convertConstant(type.asMapType().keyType(), entry.getKey())); + valueList.add(convertConstant(type.asMapType().valueType(), entry.getValue())); + } + return new ArrayBasedMapData( + new GenericArrayData(keyList.toArray()), + new GenericArrayData(valueList.toArray())); default: } return value; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java new file mode 100644 index 000000000000..3322bda50432 --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReadDefaultValue.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSparkAvroReadDefaultValue { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testAvroDefaultValues() throws IOException { + Schema writeSchema = new Schema( + required(1, "col1", Types.IntegerType.get()) + ); + List data = Collections.nCopies( + 100, + RandomData.generateSpark(writeSchema, 1, 0L).iterator().next()); + + Type[] defaultValueTypes = { + Types.StringType.get(), + Types.ListType.ofRequired(10, Types.IntegerType.get()), + Types.MapType.ofRequired(11, 12, Types.StringType.get(), + Types.IntegerType.get()), + Types.StructType.of( + Types.NestedField.required(13, "nested_col1", Types.IntegerType.get())), + Types.ListType.ofRequired(14, Types.StructType.of( + Types.NestedField.required(15, "nested_col2", Types.StringType.get()))) + }; + + Object[] defaultValues = { + "foo", // string default + ImmutableList.of(1, 2), // array default + ImmutableMap.of("bar", 1), // map default + ImmutableMap.of(13, 1), // struct default + ImmutableList.of(ImmutableMap.of(15, "xyz"), ImmutableMap.of(15, "baz")) // array of struct default + }; + + // evolve the schema to add col2, col3, col4, col5, col6 with default value + List newSchemaFields = Lists.newArrayList(writeSchema.findField(1)); + newSchemaFields.addAll(IntStream.range(0, defaultValues.length).mapToObj(i -> Types.NestedField.required(i + 2, + String.format("col%d", i + 2), defaultValueTypes[i], "doc", defaultValues[i], defaultValues[i])).collect( + Collectors.toList())); + Schema readSchema = new Schema(newSchemaFields); + + // Construct the expected read-back data as avro record + GenericData.Record r1 = new GenericData.Record(AvroSchemaUtil.convert(defaultValueTypes[3])); + r1.put(0, 1); + GenericData.Record r2 = + new GenericData.Record(AvroSchemaUtil.convert(defaultValueTypes[4].asListType().elementType())); + GenericData.Record r3 = + new GenericData.Record(AvroSchemaUtil.convert(defaultValueTypes[4].asListType().elementType())); + r2.put(0, "xyz"); + r3.put(0, "baz"); + Object[] avroDefaultValues = { + "foo", // string default + ImmutableList.of(1, 2), // array default + ImmutableMap.of("bar", 1), // map default + r1, // struct default + ImmutableList.of(r2, r3) // array of struct default + }; + + List expectedRecords = data.stream().map(datum -> { + GenericData.Record record = new GenericData.Record(AvroSchemaUtil.convert(readSchema.asStruct())); + record.put(0, datum.getInt(0)); + for (int j = 0; j < avroDefaultValues.length; j++) { + record.put(j + 1, avroDefaultValues[j]); + } + return record; + }).collect(Collectors.toList()); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try ( + FileAppender writer = Avro.write(Files.localOutput(testFile)) + .createWriterFunc(avroSchema -> new SparkAvroWriter(SparkSchemaUtil.convert(writeSchema))) + .schema(writeSchema) + .named("test") + .build()) { + writer.addAll(data); + } + + List actualRows; + try ( + AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(readSchema) + .build()) { + actualRows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expectedRecords.size(); i += 1) { + assertEqualsUnsafe(readSchema.asStruct(), expectedRecords.get(i), actualRows.get(i)); + } + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadDefaultValue.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadDefaultValue.java new file mode 100644 index 000000000000..60898527a3ca --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadDefaultValue.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.spark.source.BaseDataReader; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSparkOrcReadDefaultValue { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testOrcDefaultValues() throws IOException { + + Schema writeSchema = new Schema( + required(1, "col1", Types.IntegerType.get()) + ); + List data = Collections.nCopies( + 100, + RandomData.generateSpark(writeSchema, 1, 0L).iterator().next()); + + Type[] defaultValueTypes = { + Types.StringType.get(), + Types.ListType.ofRequired(10, Types.IntegerType.get()), + Types.MapType.ofRequired(11, 12, Types.StringType.get(), + Types.IntegerType.get()), + Types.StructType.of( + Types.NestedField.required(13, "nested_col1", Types.IntegerType.get())), + Types.ListType.ofRequired(14, Types.StructType.of( + Types.NestedField.required(15, "nested_col2", Types.StringType.get()))) + }; + + Object[] defaultValues = { + "foo", // string default + ImmutableList.of(1, 2), // array default + ImmutableMap.of("bar", 1), // map default + ImmutableMap.of(13, 1), // struct default + ImmutableList.of(ImmutableMap.of(15, "xyz"), ImmutableMap.of(15, "baz")) // array of struct default + }; + + Object[] expectedDefaults = + IntStream.range(0, defaultValues.length).mapToObj(i -> BaseDataReader.convertConstant( + defaultValueTypes[i], + defaultValues[i])).toArray(); + + List expected = data.stream().map(internalRow -> { + Object[] values = new Object[6]; + values[0] = internalRow.getInt(0); + System.arraycopy(expectedDefaults, 0, values, 1, expectedDefaults.length); + return new GenericInternalRow(values); + }).collect(Collectors.toList()); + + // evolve the schema to add col2, col3, col4, col5, col6 with default value + List newSchemaFields = Lists.newArrayList(writeSchema.findField(1)); + newSchemaFields.addAll(IntStream.range(0, defaultValues.length).mapToObj(i -> Types.NestedField.required(i + 2, + String.format("col%d", i + 2), defaultValueTypes[i], "doc", defaultValues[i], defaultValues[i])).collect( + Collectors.toList())); + Schema readSchema = new Schema(newSchemaFields); + + final File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .createWriterFunc(SparkOrcWriter::new) + .schema(writeSchema) + .build()) { + writer.addAll(data); + } + + // Try to read back the data using the evolved readSchema, the returned data should + // have default values populated in them. + + // non-vectorized read + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(readSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema)) + .build()) { + + final Iterator actualRows = reader.iterator(); + final Iterator expectedRows = expected.iterator(); + while (expectedRows.hasNext()) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + assertEquals(readSchema, expectedRows.next(), actualRows.next()); + } + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + + // vectorized-read + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(readSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(readSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + final Iterator actualRows = batchesToRows(reader.iterator()); + final Iterator expectedRows = expected.iterator(); + + while (expectedRows.hasNext()) { + Assert.assertTrue("Should have expected number of rows", actualRows.hasNext()); + assertEquals(readSchema, expectedRows.next(), actualRows.next()); + } + Assert.assertFalse("Should not have extra rows", actualRows.hasNext()); + } + } + + private Iterator batchesToRows(Iterator batches) { + return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator)); + } +}