diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java index b23b195d959a..692c1ead3fbf 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java @@ -37,7 +37,7 @@ public interface PartnerAccessors

{ P listElementPartner(P partnerList); } - static class FieldIDAccessors implements AvroWithPartnerVisitor.PartnerAccessors { + public static class FieldIDAccessors implements AvroWithPartnerVisitor.PartnerAccessors { private static final FieldIDAccessors INSTANCE = new FieldIDAccessors(); public static FieldIDAccessors get() { diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 246671076c31..67f53d3636a6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -194,11 +194,11 @@ public static ValueReader skipStruct(List> readers) { * @param idToConstant a map of field ID to constants values * @return a read plan that is a list of (position, reader) pairs */ - static List>> buildReadPlan( + public static List>> buildReadPlan( Types.StructType expected, Schema record, List> fieldReaders, - Map idToConstant) { + Map idToConstant) { Map idToPos = idToPos(expected); List>> readPlan = Lists.newArrayList(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 4622d2928ac4..7d92d963a9f4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -37,16 +37,28 @@ import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +/** + * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. + */ +@Deprecated public class SparkAvroReader implements DatumReader, SupportsRowPosition { private final Schema readSchema; private final ValueReader reader; private Schema fileSchema = null; + /** + * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. + */ + @Deprecated public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { this(expectedSchema, readSchema, ImmutableMap.of()); } + /** + * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. + */ + @Deprecated @SuppressWarnings("unchecked") public SparkAvroReader( org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map constants) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java new file mode 100644 index 000000000000..dc4af24685b3 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.iceberg.avro.AvroWithPartnerVisitor; +import org.apache.iceberg.avro.SupportsRowPosition; +import org.apache.iceberg.avro.ValueReader; +import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; + +public class SparkPlannedAvroReader implements DatumReader, SupportsRowPosition { + + private final Types.StructType expectedType; + private final Map idToConstant; + private ValueReader reader; + + public static SparkPlannedAvroReader create(org.apache.iceberg.Schema schema) { + return create(schema, ImmutableMap.of()); + } + + public static SparkPlannedAvroReader create( + org.apache.iceberg.Schema schema, Map constants) { + return new SparkPlannedAvroReader(schema, constants); + } + + private SparkPlannedAvroReader( + org.apache.iceberg.Schema expectedSchema, Map constants) { + this.expectedType = expectedSchema.asStruct(); + this.idToConstant = constants; + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema fileSchema) { + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + expectedType, + fileSchema, + new ReadBuilder(idToConstant), + AvroWithPartnerVisitor.FieldIDAccessors.get()); + } + + @Override + public InternalRow read(InternalRow reuse, Decoder decoder) throws IOException { + return reader.read(decoder, reuse); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + private static class ReadBuilder extends AvroWithPartnerVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public ValueReader record(Type partner, Schema record, List> fieldReaders) { + if (partner == null) { + return ValueReaders.skipStruct(fieldReaders); + } + + Types.StructType expected = partner.asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + + // TODO: should this pass expected so that struct.get can reuse containers? + return SparkValueReaders.struct(readPlan, expected.fields().size()); + } + + @Override + public ValueReader union(Type partner, Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader array(Type partner, Schema array, ValueReader elementReader) { + return SparkValueReaders.array(elementReader); + } + + @Override + public ValueReader arrayMap( + Type partner, Schema map, ValueReader keyReader, ValueReader valueReader) { + return SparkValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Type partner, Schema map, ValueReader valueReader) { + return SparkValueReaders.map(SparkValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Type partner, Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + // Spark uses the same representation + return ValueReaders.ints(); + + case "timestamp-millis": + // adjust to microseconds + ValueReader longs = ValueReaders.longs(); + return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; + + case "timestamp-micros": + // Spark uses the same representation + return ValueReaders.longs(); + + case "decimal": + return SparkValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + ((LogicalTypes.Decimal) logicalType).getScale()); + + case "uuid": + return SparkValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + if (partner != null && partner.typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + return SparkValueReaders.strings(); + case FIXED: + return ValueReaders.fixed(primitive.getFixedSize()); + case BYTES: + return ValueReaders.bytes(); + case ENUM: + return SparkValueReaders.enums(primitive.getEnumSymbols()); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 3cbf38d88bf4..7e65535f5ecb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -32,6 +32,7 @@ import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; @@ -74,6 +75,11 @@ static ValueReader map(ValueReader keyReader, ValueReader< return new MapReader(keyReader, valueReader); } + static ValueReader struct( + List>> readPlan, int numFields) { + return new PlannedStructReader(readPlan, numFields); + } + static ValueReader struct( List> readers, Types.StructType struct, Map idToConstant) { return new StructReader(readers, struct, idToConstant); @@ -249,6 +255,38 @@ public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException } } + static class PlannedStructReader extends ValueReaders.PlannedStructReader { + private final int numFields; + + protected PlannedStructReader(List>> readPlan, int numFields) { + super(readPlan); + this.numFields = numFields; + } + + @Override + protected InternalRow reuseOrCreate(Object reuse) { + if (reuse instanceof GenericInternalRow + && ((GenericInternalRow) reuse).numFields() == numFields) { + return (InternalRow) reuse; + } + return new GenericInternalRow(numFields); + } + + @Override + protected Object get(InternalRow struct, int pos) { + return null; + } + + @Override + protected void set(InternalRow struct, int pos, Object value) { + if (value != null) { + struct.update(pos, value); + } else { + struct.setNullAt(pos); + } + } + } + static class StructReader extends ValueReaders.StructReader { private final int numFields; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 927084caea1c..eb97185e21f1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -32,9 +32,9 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.catalyst.InternalRow; @@ -77,7 +77,7 @@ private CloseableIterable newAvroIterable( .reuseContainers() .project(projection) .split(start, length) - .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant)) + .createReaderFunc(readSchema -> SparkPlannedAvroReader.create(projection, idToConstant)) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroEnums.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroEnums.java index 11e60187fdc3..0dc8b48b2317 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroEnums.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroEnums.java @@ -79,7 +79,7 @@ public void writeAndValidateEnums() throws IOException { List rows; try (AvroIterable reader = Avro.read(Files.localInput(testFile)) - .createReaderFunc(SparkAvroReader::new) + .createResolvingReader(SparkPlannedAvroReader::create) .project(schema) .build()) { rows = Lists.newArrayList(reader); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java index 3e5088258a49..7f9bcbacf298 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java @@ -51,7 +51,7 @@ protected void writeAndValidate(Schema schema) throws IOException { List rows; try (AvroIterable reader = Avro.read(Files.localInput(testFile)) - .createReaderFunc(SparkAvroReader::new) + .createResolvingReader(SparkPlannedAvroReader::create) .project(schema) .build()) { rows = Lists.newArrayList(reader); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java index 336ee5a8d2ea..bf49bfba550f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java @@ -56,7 +56,7 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.data.ParameterizedAvroDataTest; import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkAvroReader; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; import org.apache.iceberg.types.Types; import org.apache.spark.SparkException; import org.apache.spark.TaskContext; @@ -259,7 +259,7 @@ private Dataset createDataset(Iterable records, Schema schema) thro List rows = Lists.newArrayList(); try (AvroIterable reader = Avro.read(Files.localInput(testFile)) - .createReaderFunc(SparkAvroReader::new) + .createResolvingReader(SparkPlannedAvroReader::create) .project(schema) .build()) {