From bf560fe992808a548ec1f6dbe915e3fffca4fdd1 Mon Sep 17 00:00:00 2001 From: samarthjain Date: Mon, 4 May 2020 13:35:04 -0700 Subject: [PATCH 1/2] Refactor data reader classes for vectorized reads --- .../iceberg/spark/source/BaseDataReader.java | 101 +++++++++++++++- .../iceberg/spark/source/RowDataReader.java | 114 ++++-------------- 2 files changed, 120 insertions(+), 95 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index fab6680ca8ad..3b3ec3f05696 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -22,20 +22,37 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.types.StructType; +import scala.collection.JavaConverters; /** * Base class of readers of type {@link InputPartitionReader} to read data as objects of type @param <T> @@ -44,6 +61,15 @@ */ @SuppressWarnings("checkstyle:VisibilityModifier") abstract class BaseDataReader implements InputPartitionReader { + // for some reason, the apply method can't be called from Java without reflection + static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") + .impl(UnsafeProjection.class, InternalRow.class) + .build(); + + final Schema tableSchema; + private final Schema expectedSchema; + final boolean caseSensitive; + private final Iterator tasks; private final FileIO fileIo; private final Map inputFiles; @@ -52,7 +78,11 @@ abstract class BaseDataReader implements InputPartitionReader { Closeable currentCloseable; private T current = null; - BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager) { + BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager, Schema tableSchema, + Schema expectedSchema, boolean caseSensitive) { + this.tableSchema = tableSchema; + this.expectedSchema = expectedSchema; + this.caseSensitive = caseSensitive; this.fileIo = fileIo; this.tasks = task.files().iterator(); Iterable decryptedFiles = encryptionManager.decrypt(Iterables.transform( @@ -88,7 +118,74 @@ public T get() { return current; } - abstract Iterator open(FileScanTask task); + abstract Pair> getJoinedSchemaAndIteratorWithIdentityPartition( + DataFile file, FileScanTask task, + Schema requiredSchema, Set idColumns, PartitionSpec spec); + + abstract Iterator open(FileScanTask task, Schema readSchema, Map idToConstant); + + private Iterator open(FileScanTask task) { + DataFile file = task.file(); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); + + // schema or rows returned by readers + Schema finalSchema = expectedSchema; + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + + // schema needed for the projection and filtering + StructType sparkType = SparkSchemaUtil.convert(finalSchema); + Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); + boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); + boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); + + Schema iterSchema; + Iterator iter; + + if (hasJoinedPartitionColumns) { + Pair> pair = getJoinedSchemaAndIteratorWithIdentityPartition(file, task, requiredSchema, + idColumns, spec); + iterSchema = pair.first(); + iter = pair.second(); + } else if (hasExtraFilterColumns) { + // add projection to the final schema + iterSchema = requiredSchema; + iter = open(task, requiredSchema, ImmutableMap.of()); + } else { + // return the base iterator + iterSchema = finalSchema; + iter = open(task, finalSchema, ImmutableMap.of()); + } + + // TODO: remove the projection by reporting the iterator's schema back to Spark + return Iterators.transform( + iter, + APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); + } + + static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { + StructType struct = SparkSchemaUtil.convert(readSchema); + + List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); + List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); + List exprs = + Lists.newArrayListWithExpectedSize(struct.fields().length); + + for (AttributeReference ref : refs) { + attrs.add(ref.toAttribute()); + } + + for (Types.NestedField field : finalSchema.columns()) { + int indexInReadSchema = struct.fieldIndex(field.name()); + exprs.add(refs.get(indexInReadSchema)); + } + + return UnsafeProjection.create( + JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), + JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); + } @Override public void close() throws IOException { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index ec9aa706ef09..24b9b6a57a0f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -22,12 +22,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.avro.generic.GenericData; @@ -40,109 +38,61 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionUtil; -import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.JoinedRow; -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO, FileFormat.PARQUET); - // for some reason, the apply method can't be called from Java without reflection - private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") - .impl(UnsafeProjection.class, InternalRow.class) - .build(); - - private final Schema tableSchema; - private final Schema expectedSchema; - private final boolean caseSensitive; RowDataReader( CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, EncryptionManager encryptionManager, boolean caseSensitive) { - super(task, fileIo, encryptionManager); - this.tableSchema = tableSchema; - this.expectedSchema = expectedSchema; - this.caseSensitive = caseSensitive; + super(task, fileIo, encryptionManager, tableSchema, expectedSchema, caseSensitive); } @Override - Iterator open(FileScanTask task) { - DataFile file = task.file(); - - // update the current file for Spark's filename() function - InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - - // schema or rows returned by readers - Schema finalSchema = expectedSchema; - PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); - - // schema needed for the projection and filtering - StructType sparkType = SparkSchemaUtil.convert(finalSchema); - Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); - boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); - + Pair> getJoinedSchemaAndIteratorWithIdentityPartition(DataFile file, FileScanTask task, + Schema requiredSchema, Set idColumns, PartitionSpec spec) { Schema iterSchema; Iterator iter; - - if (hasJoinedPartitionColumns) { - if (SUPPORTS_CONSTANTS.contains(file.format())) { - iterSchema = requiredSchema; - iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)); - } else { - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); - - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft); - } - } else if (hasExtraFilterColumns) { - // add projection to the final schema + if (SUPPORTS_CONSTANTS.contains(file.format())) { iterSchema = requiredSchema; - iter = open(task, requiredSchema, ImmutableMap.of()); + iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)); } else { - // return the base iterator - iterSchema = finalSchema; - iter = open(task, finalSchema, ImmutableMap.of()); + // schema used to read data files + Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); + Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); + PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); + JoinedRow joined = new JoinedRow(); + + InternalRow partition = convertToRow.apply(file.partition()); + joined.withRight(partition); + + // create joined rows and project from the joined schema to the final schema + iterSchema = TypeUtil.join(readSchema, partitionSchema); + iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft); } - - // TODO: remove the projection by reporting the iterator's schema back to Spark - return Iterators.transform( - iter, - APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); + return Pair.of(iterSchema, iter); } - private Iterator open(FileScanTask task, Schema readSchema, Map idToConstant) { + @Override + Iterator open(FileScanTask task, Schema readSchema, Map idToConstant) { CloseableIterable iter; if (task.isDataTask()) { iter = newDataIterable(task.asDataTask(), readSchema); @@ -221,28 +171,6 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); } - private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) { - StructType struct = SparkSchemaUtil.convert(readSchema); - - List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); - List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); - List exprs = - Lists.newArrayListWithExpectedSize(struct.fields().length); - - for (AttributeReference ref : refs) { - attrs.add(ref.toAttribute()); - } - - for (Types.NestedField field : finalSchema.columns()) { - int indexInReadSchema = struct.fieldIndex(field.name()); - exprs.add(refs.get(indexInReadSchema)); - } - - return UnsafeProjection.create( - JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), - JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); - } - private static Object convertConstant(Type type, Object value) { if (value == null) { return null; From cf8226e950273a08259738ee2eb761b3565b1462 Mon Sep 17 00:00:00 2001 From: samarthjain Date: Tue, 5 May 2020 11:26:12 -0700 Subject: [PATCH 2/2] Add javadoc --- .../java/org/apache/iceberg/spark/source/BaseDataReader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index 3b3ec3f05696..60b93bedf588 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -118,6 +118,10 @@ public T get() { return current; } + /** + * Return a {@link Pair} of {@link Schema} and {@link Iterator} over records of type T that include the identity + * partition columns being projected. + */ abstract Pair> getJoinedSchemaAndIteratorWithIdentityPartition( DataFile file, FileScanTask task, Schema requiredSchema, Set idColumns, PartitionSpec spec);