diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index c99c491ee39b..2a6772155cd0 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -153,7 +153,7 @@ public void add(DataFile addedFile) { addEntry(reused.wrapAppend(snapshotId, addedFile)); } - public void add(ManifestEntry entry) { + void add(ManifestEntry entry) { addEntry(reused.wrapAppend(snapshotId, entry.file())); } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 14dc8698be81..cbbf77c1e146 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -40,6 +40,8 @@ import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import static java.util.Collections.emptyIterator; @@ -561,12 +563,32 @@ public Map read(Decoder decoder, Object reuse) throws IOException { public abstract static class StructReader implements ValueReader { private final ValueReader[] readers; + private final int[] positions; + private final Object[] constants; protected StructReader(List> readers) { - this.readers = new ValueReader[readers.size()]; - for (int i = 0; i < this.readers.length; i += 1) { - this.readers[i] = readers.get(i); + this.readers = readers.toArray(new ValueReader[0]); + this.positions = new int[0]; + this.constants = new Object[0]; + } + + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + this.readers = readers.toArray(new ValueReader[0]); + + List fields = struct.fields(); + List positionList = Lists.newArrayListWithCapacity(fields.size()); + List constantList = Lists.newArrayListWithCapacity(fields.size()); + for (int pos = 0; pos < fields.size(); pos += 1) { + Types.NestedField field = fields.get(pos); + Object constant = idToConstant.get(field.fieldId()); + if (constant != null) { + positionList.add(pos); + constantList.add(prepareConstant(field.type(), constant)); + } } + + this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); + this.constants = constantList.toArray(); } protected abstract S reuseOrCreate(Object reuse); @@ -575,6 +597,10 @@ protected StructReader(List> readers) { protected abstract void set(S struct, int pos, Object value); + protected Object prepareConstant(Type type, Object value) { + return value; + } + public ValueReader reader(int pos) { return readers[pos]; } @@ -597,6 +623,10 @@ public S read(Decoder decoder, Object reuse) throws IOException { } } + for (int i = 0; i < positions.length; i += 1) { + set(struct, positions[i], constants[i]); + } + return struct; } } diff --git a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java index da692f71d1f8..0c6b26abd225 100644 --- a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java +++ b/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java @@ -41,7 +41,7 @@ public static byte[] toByteArray(ByteBuffer buffer) { } } else { byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); + buffer.asReadOnlyBuffer().get(bytes); return bytes; } } diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java new file mode 100644 index 000000000000..9a2aa992b04f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +public class PartitionUtil { + private PartitionUtil() { + } + + public static Map constantsMap(FileScanTask task) { + return constantsMap(task.spec(), task.file().partition()); + } + + private static Map constantsMap(PartitionSpec spec, StructLike partitionData) { + // use java.util.HashMap because partition data may contain null values + Map idToConstant = new HashMap<>(); + List fields = spec.fields(); + for (int pos = 0; pos < fields.size(); pos += 1) { + PartitionField field = fields.get(pos); + idToConstant.put(field.sourceId(), partitionData.get(pos, Object.class)); + } + return idToConstant; + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index 4ce36cd7dd8a..baa2320244ce 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; @@ -45,6 +46,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.util.PartitionUtil; class TableScanIterable extends CloseableGroup implements CloseableIterable { private final TableOperations ops; @@ -74,13 +76,15 @@ public Iterator iterator() { private CloseableIterable open(FileScanTask task) { InputFile input = ops.io().newInputFile(task.file().path().toString()); + Map partition = PartitionUtil.constantsMap(task); // TODO: join to partition data from the manifest file switch (task.file().format()) { case AVRO: Avro.ReadBuilder avro = Avro.read(input) .project(projection) - .createReaderFunc(DataReader::create) + .createReaderFunc( + avroSchema -> DataReader.create(projection, avroSchema, partition)) .split(task.start(), task.length()); if (reuseContainers) { diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java index f975aa8a3f75..1d226c3ecbad 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.data.avro; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.MapMaker; import java.io.IOException; import java.util.HashMap; @@ -45,7 +46,12 @@ public class DataReader implements DatumReader { ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap()); public static DataReader create(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { - return new DataReader<>(expectedSchema, readSchema); + return create(expectedSchema, readSchema, ImmutableMap.of()); + } + + public static DataReader create(org.apache.iceberg.Schema expectedSchema, Schema readSchema, + Map idToConstant) { + return new DataReader<>(expectedSchema, readSchema, idToConstant); } private final Schema readSchema; @@ -53,9 +59,10 @@ public static DataReader create(org.apache.iceberg.Schema expectedSchema, private Schema fileSchema = null; @SuppressWarnings("unchecked") - private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { + private DataReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map idToConstant) { this.readSchema = readSchema; - this.reader = (ValueReader) AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder()); + this.reader = (ValueReader) AvroSchemaWithTypeVisitor + .visit(expectedSchema, readSchema, new ReadBuilder(idToConstant)); } @Override @@ -96,14 +103,16 @@ private ResolvingDecoder newResolver() { } private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { + private final Map idToConstant; - private ReadBuilder() { + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; } @Override public ValueReader record(Types.StructType struct, Schema record, List names, List> fields) { - return GenericReaders.struct(struct, fields); + return GenericReaders.struct(struct, fields, idToConstant); } @Override diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 14e42699c200..7502d151121c 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -28,6 +28,7 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import org.apache.avro.io.Decoder; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; @@ -55,8 +56,8 @@ static ValueReader timestamptz() { return TimestamptzReader.INSTANCE; } - static ValueReader struct(StructType struct, List> readers) { - return new GenericRecordReader(readers, struct); + static ValueReader struct(StructType struct, List> readers, Map idToConstant) { + return new GenericRecordReader(readers, struct, idToConstant); } private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -113,8 +114,8 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { private static class GenericRecordReader extends ValueReaders.StructReader { private final StructType structType; - private GenericRecordReader(List> readers, StructType struct) { - super(readers); + private GenericRecordReader(List> readers, StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); this.structType = struct; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index 88977bdc6ac5..215a2654ebfb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.data; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.MapMaker; import java.io.IOException; import java.util.HashMap; @@ -31,10 +32,12 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.ResolvingDecoder; -import org.apache.iceberg.avro.AvroSchemaVisitor; +import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; @@ -47,10 +50,15 @@ public class SparkAvroReader implements DatumReader { private final ValueReader reader; private Schema fileSchema = null; + public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { + this(expectedSchema, readSchema, ImmutableMap.of()); + } + @SuppressWarnings("unchecked") - public SparkAvroReader(Schema readSchema) { + public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map constants) { this.readSchema = readSchema; - this.reader = (ValueReader) AvroSchemaVisitor.visit(readSchema, new ReadBuilder()); + this.reader = (ValueReader) AvroSchemaWithTypeVisitor + .visit(expectedSchema, readSchema, new ReadBuilder(constants)); } @Override @@ -90,38 +98,42 @@ private ResolvingDecoder newResolver() { } } - private static class ReadBuilder extends AvroSchemaVisitor> { - private ReadBuilder() { + private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; } @Override - public ValueReader record(Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields); + public ValueReader record(Types.StructType expected, Schema record, List names, + List> fields) { + return SparkValueReaders.struct(fields, expected, idToConstant); } @Override - public ValueReader union(Schema union, List> options) { + public ValueReader union(Type expected, Schema union, List> options) { return ValueReaders.union(options); } @Override - public ValueReader array(Schema array, ValueReader elementReader) { - LogicalType logical = array.getLogicalType(); - if (logical != null && "map".equals(logical.getName())) { - ValueReader[] keyValueReaders = ((SparkValueReaders.StructReader) elementReader).readers(); - return SparkValueReaders.arrayMap(keyValueReaders[0], keyValueReaders[1]); - } - + public ValueReader array(Types.ListType expected, Schema array, ValueReader elementReader) { return SparkValueReaders.array(elementReader); } @Override - public ValueReader map(Schema map, ValueReader valueReader) { + public ValueReader map(Types.MapType expected, Schema map, + ValueReader keyReader, ValueReader valueReader) { + return SparkValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Types.MapType expected, Schema map, ValueReader valueReader) { return SparkValueReaders.map(SparkValueReaders.strings(), valueReader); } @Override - public ValueReader primitive(Schema primitive) { + public ValueReader primitive(Type.PrimitiveType expected, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index 2670cd4fa18a..b799fe8c0cf5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -27,12 +27,16 @@ import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.UUID; -import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.io.Decoder; -import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; +import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; @@ -74,8 +78,9 @@ static ValueReader map(ValueReader keyReader, ValueReader< return new MapReader(keyReader, valueReader); } - static ValueReader struct(List> readers) { - return new StructReader(readers); + static ValueReader struct(List> readers, Types.StructType struct, + Map idToConstant) { + return new StructReader(readers, struct, idToConstant); } private static class StringReader implements ValueReader { @@ -253,46 +258,59 @@ public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException } } - static class StructReader implements ValueReader { - private final ValueReader[] readers; + static class StructReader extends ValueReaders.StructReader { + private final int numFields; - private StructReader(List> readers) { - this.readers = new ValueReader[readers.size()]; - for (int i = 0; i < this.readers.length; i += 1) { - this.readers[i] = readers.get(i); - } + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); + this.numFields = readers.size(); } - ValueReader[] readers() { - return readers; + @Override + protected InternalRow reuseOrCreate(Object reuse) { + if (reuse instanceof GenericInternalRow && ((GenericInternalRow) reuse).numFields() == numFields) { + return (InternalRow) reuse; + } + return new GenericInternalRow(numFields); } @Override - public InternalRow read(Decoder decoder, Object reuse) throws IOException { - GenericInternalRow row = new GenericInternalRow(readers.length); - if (decoder instanceof ResolvingDecoder) { - // this may not set all of the fields. nulls are set by default. - for (Schema.Field field : ((ResolvingDecoder) decoder).readFieldOrder()) { - Object value = readers[field.pos()].read(decoder, null); - if (value != null) { - row.update(field.pos(), value); - } else { - row.setNullAt(field.pos()); - } - } + protected Object get(InternalRow struct, int pos) { + return null; + } + @Override + protected void set(InternalRow struct, int pos, Object value) { + if (value != null) { + struct.update(pos, value); } else { - for (int i = 0; i < readers.length; i += 1) { - Object value = readers[i].read(decoder, null); - if (value != null) { - row.update(i, value); - } else { - row.setNullAt(i); - } - } + struct.setNullAt(pos); } + } - return row; + @Override + protected Object prepareConstant(Type type, Object value) { + switch (type.typeId()) { + case DECIMAL: + return Decimal.apply((BigDecimal) value); + case STRING: + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8) value; + return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + return UTF8String.fromString(value.toString()); + case FIXED: + if (value instanceof byte[]) { + return value; + } else if (value instanceof GenericData.Fixed) { + return ((GenericData.Fixed) value).bytes(); + } + return ByteBuffers.toByteArray((ByteBuffer) value); + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + default: + } + return value; } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3e4c21316edc..61c6fa227485 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -20,14 +20,18 @@ package org.apache.iceberg.spark.source; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -45,6 +49,7 @@ import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; @@ -55,6 +60,7 @@ import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { + private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO); // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -95,26 +101,31 @@ Iterator open(FileScanTask task) { Iterator iter; if (hasJoinedPartitionColumns) { - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); - - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema), joined::withLeft); + if (SUPPORTS_CONSTANTS.contains(file.format())) { + iterSchema = requiredSchema; + iter = open(task, requiredSchema, PartitionUtil.constantsMap(task)); + } else { + // schema used to read data files + Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); + Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); + PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); + JoinedRow joined = new JoinedRow(); + + InternalRow partition = convertToRow.apply(file.partition()); + joined.withRight(partition); + + // create joined rows and project from the joined schema to the final schema + iterSchema = TypeUtil.join(readSchema, partitionSchema); + iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft); + } } else if (hasExtraFilterColumns) { // add projection to the final schema iterSchema = requiredSchema; - iter = open(task, requiredSchema); + iter = open(task, requiredSchema, ImmutableMap.of()); } else { // return the base iterator iterSchema = finalSchema; - iter = open(task, finalSchema); + iter = open(task, finalSchema, ImmutableMap.of()); } // TODO: remove the projection by reporting the iterator's schema back to Spark @@ -123,7 +134,7 @@ Iterator open(FileScanTask task) { APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); } - private Iterator open(FileScanTask task, Schema readSchema) { + private Iterator open(FileScanTask task, Schema readSchema, Map idToConstant) { CloseableIterable iter; if (task.isDataTask()) { iter = newDataIterable(task.asDataTask(), readSchema); @@ -137,7 +148,7 @@ private Iterator open(FileScanTask task, Schema readSchema) { break; case AVRO: - iter = newAvroIterable(location, task, readSchema); + iter = newAvroIterable(location, task, readSchema, idToConstant); break; case ORC: @@ -158,12 +169,13 @@ private Iterator open(FileScanTask task, Schema readSchema) { private CloseableIterable newAvroIterable( InputFile location, FileScanTask task, - Schema readSchema) { + Schema projection, + Map idToConstant) { return Avro.read(location) .reuseContainers() - .project(readSchema) + .project(projection) .split(task.start(), task.length()) - .createReaderFunc(SparkAvroReader::new) + .createReaderFunc(readSchema -> new SparkAvroReader(projection, readSchema, idToConstant)) .build(); }