diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index 074fcc8c074f..3481fc51de02 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -47,6 +47,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; class TableScanIterable extends CloseableGroup implements CloseableIterable { @@ -106,8 +107,9 @@ private CloseableIterable open(FileScanTask task) { return parquet.build(); case ORC: + Schema projectionWithoutConstants = TypeUtil.selectNot(projection, partition.keySet()); ORC.ReadBuilder orc = ORC.read(input) - .project(projection) + .project(projectionWithoutConstants) .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(projection, fileSchema, partition)) .split(task.start(), task.length()) .filter(task.residual()); diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 3220f224ef72..49c8ddb05bad 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -321,8 +321,10 @@ private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTas } private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + Map idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant); + Schema readSchemaWithoutConstants = TypeUtil.selectNot(readSchema, idToConstant.keySet()); ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) - .project(readSchema) + .project(readSchemaWithoutConstants) .filter(task.residual()) .caseSensitive(caseSensitive) .split(task.start(), task.length()); @@ -335,7 +337,7 @@ private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask ta case GENERIC: orcReadBuilder.createReaderFunc( fileSchema -> GenericOrcReader.buildReader( - readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant))); + readSchema, fileSchema, idToConstant)); } return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java index e3aede03fd53..d5075e47b549 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -59,6 +58,10 @@ public static OrcValueReader bytes() { return BytesReader.INSTANCE; } + public static OrcValueReader constants(C constant) { + return new ConstantReader<>(constant); + } + private static class BooleanReader implements OrcValueReader { static final BooleanReader INSTANCE = new BooleanReader(); @@ -136,31 +139,22 @@ public byte[] nonNullRead(ColumnVector vector, int row) { public abstract static class StructReader implements OrcValueReader { private final OrcValueReader[] readers; - private final int[] positions; - private final Object[] constants; - - protected StructReader(List> readers) { - this.readers = readers.toArray(new OrcValueReader[0]); - this.positions = new int[0]; - this.constants = new Object[0]; - } + private final boolean[] isConstantField; protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { - this.readers = readers.toArray(new OrcValueReader[0]); List fields = struct.fields(); - List positionList = Lists.newArrayListWithCapacity(fields.size()); - List constantList = Lists.newArrayListWithCapacity(fields.size()); - for (int pos = 0; pos < fields.size(); pos += 1) { + this.readers = new OrcValueReader[fields.size()]; + this.isConstantField = new boolean[fields.size()]; + for (int pos = 0, readerIndex = 0; pos < fields.size(); pos += 1) { Types.NestedField field = fields.get(pos); - Object constant = idToConstant.get(field.fieldId()); - if (constant != null) { - positionList.add(pos); - constantList.add(idToConstant.get(field.fieldId())); + if (idToConstant.containsKey(field.fieldId())) { + this.isConstantField[pos] = true; + this.readers[pos] = constants(idToConstant.get(field.fieldId())); + } else { + this.readers[pos] = readers.get(readerIndex); + readerIndex++; } } - - this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); - this.constants = constantList.toArray(); } protected abstract T create(); @@ -178,15 +172,35 @@ public T nonNullRead(ColumnVector vector, int row) { } private T readInternal(T struct, ColumnVector[] columnVectors, int row) { - for (int c = 0; c < readers.length; ++c) { - set(struct, c, reader(c).read(columnVectors[c], row)); + for (int c = 0, vectorIndex = 0; c < readers.length; ++c) { + ColumnVector vector; + if (isConstantField[c]) { + vector = null; + } else { + vector = columnVectors[vectorIndex]; + vectorIndex++; + } + set(struct, c, reader(c).read(vector, row)); } + return struct; + } + } - for (int i = 0; i < positions.length; i += 1) { - set(struct, positions[i], constants[i]); - } + private static class ConstantReader implements OrcValueReader { + private final C constant; - return struct; + private ConstantReader(C constant) { + this.constant = constant; + } + + @Override + public C read(ColumnVector ignored, int ignoredRow) { + return constant; + } + + @Override + public C nonNullRead(ColumnVector ignored, int ignoredRow) { + return constant; } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 55e311788caa..5add4994aab0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -120,7 +120,7 @@ static class StructReader extends OrcValueReaders.StructReader { protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { super(readers, struct, idToConstant); - this.numFields = readers.size(); + this.numFields = struct.fields().size(); } @Override diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4ebf837c875e..16d5cb9cffca 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -178,8 +178,9 @@ private CloseableIterable newOrcIterable( FileScanTask task, Schema readSchema, Map idToConstant) { + Schema readSchemaWithoutConstants = TypeUtil.selectNot(readSchema, idToConstant.keySet()); return ORC.read(location) - .project(readSchema) + .project(readSchemaWithoutConstants) .split(task.start(), task.length()) .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(task.residual())