diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 0f2d0673c9fd..6eb26e8b73c1 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}
- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"
+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
implementation libs.parquet.column
implementation libs.parquet.hadoop
@@ -185,7 +185,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.junit.vintage.engine
- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.3.0"
+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
index d834d1f899a2..4a28fc51da9b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -53,7 +53,8 @@ private static class DeleteColumnReader extends MetadataColumnReader {
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
- false /* useDecimal128 = false */);
+ false /* useDecimal128 = false */,
+ false /* isConstant */);
this.isDeleted = new boolean[0];
}
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
index ac68352fd29a..1949a717982a 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
@@ -42,7 +42,11 @@ private static class PositionColumnReader extends MetadataColumnReader {
private long position;
PositionColumnReader(ColumnDescriptor descriptor) {
- super(DataTypes.LongType, descriptor, false /* useDecimal128 = false */);
+ super(
+ DataTypes.LongType,
+ descriptor,
+ false /* useDecimal128 = false */,
+ false /* isConstant */);
}
@Override
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index b5a182f3678e..e2d2c7a7ac07 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
+ annotationProcessor libs.immutables.value
+ compileOnly libs.immutables.value
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-data')
@@ -73,6 +75,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}
+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+
implementation libs.parquet.column
implementation libs.parquet.hadoop
@@ -179,6 +183,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java
new file mode 100644
index 000000000000..d3b339d60e3f
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface OrcBatchReadConf extends Serializable {
+ int batchSize();
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java
new file mode 100644
index 000000000000..442d728d4d69
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.Serializable;
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface ParquetBatchReadConf extends Serializable {
+ int batchSize();
+
+ ParquetReaderType readerType();
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java
new file mode 100644
index 000000000000..d9742c048251
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Enumerates the types of Parquet readers. */
+public enum ParquetReaderType {
+ /** ICEBERG type utilizes the built-in Parquet reader. */
+ ICEBERG,
+
+ /**
+ * COMET type changes the Parquet reader to the Apache DataFusion Comet Parquet reader. Comet
+ * Parquet reader performs I/O and decompression in the JVM but decodes in native to improve
+ * performance. Additionally, Comet will convert Spark's physical plan into a native physical plan
+ * and execute this plan natively.
+ *
+ *
TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
+ * physical plan to native physical plan for native execution.
+ */
+ COMET;
+
+ public static ParquetReaderType fromString(String typeAsString) {
+ Preconditions.checkArgument(typeAsString != null, "Parquet reader type is null");
+ try {
+ return ParquetReaderType.valueOf(typeAsString.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown parquet reader type: " + typeAsString);
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 67e9d78ada4d..a913a0157f31 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -355,4 +355,12 @@ public boolean reportColumnStats() {
.defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
.parse();
}
+
+ public ParquetReaderType parquetReaderType() {
+ return confParser
+ .enumConf(ParquetReaderType::fromString)
+ .sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
+ .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
+ .parse();
+ }
}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 9130e63ba97e..d6c16bb0e238 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -27,6 +27,9 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";
+ // Controls which Parquet reader implementation to use
+ public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
+ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG;
// Controls whether to perform the nullability check during writes
public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability";
public static final boolean CHECK_NULLABILITY_DEFAULT = true;
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
new file mode 100644
index 000000000000..4794863ab1bf
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.comet.parquet.AbstractColumnReader;
+import org.apache.comet.parquet.ColumnReader;
+import org.apache.comet.parquet.TypeUtil;
+import org.apache.comet.parquet.Utils;
+import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
+import org.apache.comet.shaded.arrow.memory.RootAllocator;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.vectorized.ColumnVector;
+
+class CometColumnReader implements VectorizedReader {
+ // use the Comet default batch size
+ public static final int DEFAULT_BATCH_SIZE = 8192;
+
+ private final ColumnDescriptor descriptor;
+ private final DataType sparkType;
+
+ // The delegated ColumnReader from Comet side
+ private AbstractColumnReader delegate;
+ private boolean initialized = false;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private CometSchemaImporter importer;
+
+ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
+ this.sparkType = sparkType;
+ this.descriptor = descriptor;
+ }
+
+ CometColumnReader(Types.NestedField field) {
+ DataType dataType = SparkSchemaUtil.convert(field.type());
+ StructField structField = new StructField(field.name(), dataType, false, Metadata.empty());
+ this.sparkType = dataType;
+ this.descriptor = TypeUtil.convertToParquet(structField);
+ }
+
+ public AbstractColumnReader delegate() {
+ return delegate;
+ }
+
+ void setDelegate(AbstractColumnReader delegate) {
+ this.delegate = delegate;
+ }
+
+ void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
+ public int batchSize() {
+ return batchSize;
+ }
+
+ /**
+ * This method is to initialized/reset the CometColumnReader. This needs to be called for each row
+ * group after readNextRowGroup, so a new dictionary encoding can be set for each of the new row
+ * groups.
+ */
+ public void reset() {
+ if (importer != null) {
+ importer.close();
+ }
+
+ if (delegate != null) {
+ delegate.close();
+ }
+
+ this.importer = new CometSchemaImporter(new RootAllocator());
+ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
+ this.initialized = true;
+ }
+
+ public ColumnDescriptor descriptor() {
+ return descriptor;
+ }
+
+ /** Returns the Spark data type for this column. */
+ public DataType sparkType() {
+ return sparkType;
+ }
+
+ /**
+ * Set the page reader to be 'pageReader'.
+ *
+ * NOTE: this should be called before reading a new Parquet column chunk, and after {@link
+ * CometColumnReader#reset} is called.
+ */
+ public void setPageReader(PageReader pageReader) throws IOException {
+ Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first");
+ ((ColumnReader) delegate).setPageReader(pageReader);
+ }
+
+ @Override
+ public void close() {
+ // close resources on native side
+ if (importer != null) {
+ importer.close();
+ }
+
+ if (delegate != null) {
+ delegate.close();
+ }
+ }
+
+ @Override
+ public void setBatchSize(int size) {
+ this.batchSize = size;
+ }
+
+ @Override
+ public void setRowGroupInfo(
+ PageReadStore pageReadStore, Map map, long size) {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public ColumnVector read(ColumnVector reuse, int numRowsToRead) {
+ throw new UnsupportedOperationException("Not supported");
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
new file mode 100644
index 000000000000..1440e5d1d3f7
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.comet.parquet.AbstractColumnReader;
+import org.apache.comet.parquet.BatchReader;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized
+ * read path. The {@link ColumnarBatch} returned is created by passing in the Arrow vectors
+ * populated via delegated read calls to {@link CometColumnReader VectorReader(s)}.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+class CometColumnarBatchReader implements VectorizedReader {
+
+ private final CometColumnReader[] readers;
+ private final boolean hasIsDeletedColumn;
+
+ // The delegated BatchReader on the Comet side does the real work of loading a batch of rows.
+ // The Comet BatchReader contains an array of ColumnReader. There is no need to explicitly call
+ // ColumnReader.readBatch; instead, BatchReader.nextBatch will be called, which underneath calls
+ // ColumnReader.readBatch. The only exception is DeleteColumnReader, because at the time of
+ // calling BatchReader.nextBatch, the isDeleted value is not yet available, so
+ // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is
+ // available.
+ private final BatchReader delegate;
+ private DeleteFilter deletes = null;
+ private long rowStartPosInBatch = 0;
+
+ CometColumnarBatchReader(List> readers, Schema schema) {
+ this.readers =
+ readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new);
+ this.hasIsDeletedColumn =
+ readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);
+
+ AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
+ this.delegate = new BatchReader(abstractColumnReaders);
+ delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
+ }
+
+ @Override
+ public void setRowGroupInfo(
+ PageReadStore pageStore, Map metaData, long rowPosition) {
+ setRowGroupInfo(pageStore, metaData);
+ }
+
+ @Override
+ public void setRowGroupInfo(
+ PageReadStore pageStore, Map metaData) {
+ for (int i = 0; i < readers.length; i++) {
+ try {
+ if (!(readers[i] instanceof CometConstantColumnReader)
+ && !(readers[i] instanceof CometPositionColumnReader)
+ && !(readers[i] instanceof CometDeleteColumnReader)) {
+ readers[i].reset();
+ readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e);
+ }
+ }
+
+ for (int i = 0; i < readers.length; i++) {
+ delegate.getColumnReaders()[i] = this.readers[i].delegate();
+ }
+
+ this.rowStartPosInBatch =
+ pageStore
+ .getRowIndexOffset()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "PageReadStore does not contain row index offset"));
+ }
+
+ public void setDeleteFilter(DeleteFilter deleteFilter) {
+ this.deletes = deleteFilter;
+ }
+
+ @Override
+ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
+ ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
+ rowStartPosInBatch += numRowsToRead;
+ return columnarBatch;
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ for (CometColumnReader reader : readers) {
+ if (reader != null) {
+ reader.setBatchSize(batchSize);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ for (CometColumnReader reader : readers) {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ private class ColumnBatchLoader {
+ private final int batchSize;
+
+ ColumnBatchLoader(int numRowsToRead) {
+ Preconditions.checkArgument(
+ numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
+ this.batchSize = numRowsToRead;
+ }
+
+ ColumnarBatch loadDataToColumnBatch() {
+ ColumnVector[] vectors = readDataToColumnVectors();
+ int numLiveRows = batchSize;
+
+ if (hasIsDeletedColumn) {
+ boolean[] isDeleted = buildIsDeleted(vectors);
+ readDeletedColumn(vectors, isDeleted);
+ } else {
+ Pair pair = buildRowIdMapping(vectors);
+ if (pair != null) {
+ int[] rowIdMapping = pair.first();
+ numLiveRows = pair.second();
+ for (int i = 0; i < vectors.length; i++) {
+ vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
+ }
+ }
+ }
+
+ if (deletes != null && deletes.hasEqDeletes()) {
+ vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
+ }
+
+ ColumnarBatch batch = new ColumnarBatch(vectors);
+ batch.setNumRows(numLiveRows);
+ return batch;
+ }
+
+ private boolean[] buildIsDeleted(ColumnVector[] vectors) {
+ return ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, batchSize);
+ }
+
+ private Pair buildRowIdMapping(ColumnVector[] vectors) {
+ return ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, rowStartPosInBatch, batchSize);
+ }
+
+ ColumnVector[] readDataToColumnVectors() {
+ ColumnVector[] columnVectors = new ColumnVector[readers.length];
+ // Fetch rows for all readers in the delegate
+ delegate.nextBatch(batchSize);
+ for (int i = 0; i < readers.length; i++) {
+ columnVectors[i] = readers[i].delegate().currentBatch();
+ }
+
+ return columnVectors;
+ }
+
+ void readDeletedColumn(ColumnVector[] columnVectors, boolean[] isDeleted) {
+ for (int i = 0; i < readers.length; i++) {
+ if (readers[i] instanceof CometDeleteColumnReader) {
+ CometDeleteColumnReader deleteColumnReader = new CometDeleteColumnReader<>(isDeleted);
+ deleteColumnReader.setBatchSize(batchSize);
+ deleteColumnReader.delegate().readBatch(batchSize);
+ columnVectors[i] = deleteColumnReader.delegate().currentBatch();
+ }
+ }
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
new file mode 100644
index 000000000000..047c96314b13
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.comet.parquet.ConstantColumnReader;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class CometConstantColumnReader extends CometColumnReader {
+
+ CometConstantColumnReader(T value, Types.NestedField field) {
+ super(field);
+ // use delegate to set constant value on the native side to be consumed by native execution.
+ setDelegate(
+ new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ super.setBatchSize(batchSize);
+ delegate().setBatchSize(batchSize);
+ setInitialized(true);
+ }
+
+ private Object convertToSparkValue(T value) {
+ DataType dataType = sparkType();
+ // Match the value to Spark internal type if necessary
+ if (dataType == DataTypes.StringType && value instanceof String) {
+ // the internal type for StringType is UTF8String
+ return UTF8String.fromString((String) value);
+ } else if (dataType instanceof DecimalType && value instanceof BigDecimal) {
+ // the internal type for DecimalType is Decimal
+ return Decimal.apply((BigDecimal) value);
+ } else if (dataType == DataTypes.BinaryType && value instanceof ByteBuffer) {
+ // the internal type for DecimalType is byte[]
+ // Iceberg default value should always use HeapBufferBuffer, so calling ByteBuffer.array()
+ // should be safe.
+ return ((ByteBuffer) value).array();
+ } else {
+ return value;
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
new file mode 100644
index 000000000000..6235bfe4865e
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.comet.parquet.MetadataColumnReader;
+import org.apache.comet.parquet.Native;
+import org.apache.comet.parquet.TypeUtil;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+
+class CometDeleteColumnReader extends CometColumnReader {
+ CometDeleteColumnReader(Types.NestedField field) {
+ super(field);
+ setDelegate(new DeleteColumnReader());
+ }
+
+ CometDeleteColumnReader(boolean[] isDeleted) {
+ super(MetadataColumns.IS_DELETED);
+ setDelegate(new DeleteColumnReader(isDeleted));
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ super.setBatchSize(batchSize);
+ delegate().setBatchSize(batchSize);
+ setInitialized(true);
+ }
+
+ private static class DeleteColumnReader extends MetadataColumnReader {
+ private boolean[] isDeleted;
+
+ DeleteColumnReader() {
+ super(
+ DataTypes.BooleanType,
+ TypeUtil.convertToParquet(
+ new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
+ false /* useDecimal128 = false */,
+ false /* isConstant = false */);
+ this.isDeleted = new boolean[0];
+ }
+
+ DeleteColumnReader(boolean[] isDeleted) {
+ this();
+ this.isDeleted = isDeleted;
+ }
+
+ @Override
+ public void readBatch(int total) {
+ Native.resetBatch(nativeHandle);
+ // set isDeleted on the native side to be consumed by native execution
+ Native.setIsDeleted(nativeHandle, isDeleted);
+
+ super.readBatch(total);
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
new file mode 100644
index 000000000000..bcc0e514c28d
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.comet.parquet.MetadataColumnReader;
+import org.apache.comet.parquet.Native;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.spark.sql.types.DataTypes;
+
+class CometPositionColumnReader extends CometColumnReader {
+ CometPositionColumnReader(Types.NestedField field) {
+ super(field);
+ setDelegate(new PositionColumnReader(descriptor()));
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ super.setBatchSize(batchSize);
+ delegate().setBatchSize(batchSize);
+ setInitialized(true);
+ }
+
+ private static class PositionColumnReader extends MetadataColumnReader {
+ /** The current position value of the column that are used to initialize this column reader. */
+ private long position;
+
+ PositionColumnReader(ColumnDescriptor descriptor) {
+ super(
+ DataTypes.LongType,
+ descriptor,
+ false /* useDecimal128 = false */,
+ false /* isConstant = false */);
+ }
+
+ @Override
+ public void readBatch(int total) {
+ Native.resetBatch(nativeHandle);
+ // set position on the native side to be consumed by native execution
+ Native.setPosition(nativeHandle, position, total);
+ position += total;
+
+ super.readBatch(total);
+ }
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
new file mode 100644
index 000000000000..d36f1a727477
--- /dev/null
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor> {
+
+ private final MessageType parquetSchema;
+ private final Schema icebergSchema;
+ private final Map idToConstant;
+ private final Function>, VectorizedReader>> readerFactory;
+ private final DeleteFilter deleteFilter;
+
+ CometVectorizedReaderBuilder(
+ Schema expectedSchema,
+ MessageType parquetSchema,
+ Map idToConstant,
+ Function>, VectorizedReader>> readerFactory,
+ DeleteFilter deleteFilter) {
+ this.parquetSchema = parquetSchema;
+ this.icebergSchema = expectedSchema;
+ this.idToConstant = idToConstant;
+ this.readerFactory = readerFactory;
+ this.deleteFilter = deleteFilter;
+ }
+
+ @Override
+ public VectorizedReader> message(
+ Types.StructType expected, MessageType message, List> fieldReaders) {
+ GroupType groupType = message.asGroupType();
+ Map> readersById = Maps.newHashMap();
+ List fields = groupType.getFields();
+
+ IntStream.range(0, fields.size())
+ .filter(pos -> fields.get(pos).getId() != null)
+ .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), fieldReaders.get(pos)));
+
+ List icebergFields =
+ expected != null ? expected.fields() : ImmutableList.of();
+
+ List> reorderedFields =
+ Lists.newArrayListWithExpectedSize(icebergFields.size());
+
+ for (Types.NestedField field : icebergFields) {
+ int id = field.fieldId();
+ VectorizedReader> reader = readersById.get(id);
+ if (idToConstant.containsKey(id)) {
+ CometConstantColumnReader constantReader =
+ new CometConstantColumnReader<>(idToConstant.get(id), field);
+ reorderedFields.add(constantReader);
+ } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
+ reorderedFields.add(new CometPositionColumnReader(field));
+ } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
+ CometColumnReader deleteReader = new CometDeleteColumnReader<>(field);
+ reorderedFields.add(deleteReader);
+ } else if (reader != null) {
+ reorderedFields.add(reader);
+ } else if (field.initialDefault() != null) {
+ CometColumnReader constantReader =
+ new CometConstantColumnReader<>(field.initialDefault(), field);
+ reorderedFields.add(constantReader);
+ } else if (field.isOptional()) {
+ CometColumnReader constantReader = new CometConstantColumnReader<>(null, field);
+ reorderedFields.add(constantReader);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Missing required field: %s", field.name()));
+ }
+ }
+ return vectorizedReader(reorderedFields);
+ }
+
+ protected VectorizedReader> vectorizedReader(List> reorderedFields) {
+ VectorizedReader> reader = readerFactory.apply(reorderedFields);
+ if (deleteFilter != null) {
+ ((CometColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
+ }
+ return reader;
+ }
+
+ @Override
+ public VectorizedReader> struct(
+ Types.StructType expected, GroupType groupType, List> fieldReaders) {
+ if (expected != null) {
+ throw new UnsupportedOperationException(
+ "Vectorized reads are not supported yet for struct fields");
+ }
+ return null;
+ }
+
+ @Override
+ public VectorizedReader> primitive(
+ org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
+
+ if (primitive.getId() == null) {
+ return null;
+ }
+ int parquetFieldId = primitive.getId().intValue();
+ ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath());
+ // Nested types not yet supported for vectorized reads
+ if (desc.getMaxRepetitionLevel() > 0) {
+ return null;
+ }
+ Types.NestedField icebergField = icebergSchema.findField(parquetFieldId);
+ if (icebergField == null) {
+ return null;
+ }
+
+ return new CometColumnReader(SparkSchemaUtil.convert(icebergField.type()), desc);
+ }
+}
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 636ad3be7dcc..b523bc5bff11 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -70,6 +70,23 @@ public static ColumnarBatchReader buildReader(
deleteFilter));
}
+ public static CometColumnarBatchReader buildCometReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ Map idToConstant,
+ DeleteFilter deleteFilter) {
+ return (CometColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(
+ expectedSchema.asStruct(),
+ fileSchema,
+ new CometVectorizedReaderBuilder(
+ expectedSchema,
+ fileSchema,
+ idToConstant,
+ readers -> new CometColumnarBatchReader(readers, expectedSchema),
+ deleteFilter));
+ }
+
// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60dc..780e1750a52e 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -32,13 +32,17 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.OrcBatchReadConf;
+import org.apache.iceberg.spark.ParquetBatchReadConf;
+import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.vectorized.ColumnarBatch;
abstract class BaseBatchReader extends BaseReader {
- private final int batchSize;
+ private final ParquetBatchReadConf parquetConf;
+ private final OrcBatchReadConf orcConf;
BaseBatchReader(
Table table,
@@ -46,9 +50,11 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable(
@@ -86,10 +92,16 @@ private CloseableIterable newParquetIterable(
.project(requiredSchema)
.split(start, length)
.createBatchedReaderFunc(
- fileSchema ->
- VectorizedSparkParquetReaders.buildReader(
- requiredSchema, fileSchema, idToConstant, deleteFilter))
- .recordsPerBatch(batchSize)
+ fileSchema -> {
+ if (parquetConf.readerType() == ParquetReaderType.COMET) {
+ return VectorizedSparkParquetReaders.buildCometReader(
+ requiredSchema, fileSchema, idToConstant, deleteFilter);
+ } else {
+ return VectorizedSparkParquetReaders.buildReader(
+ requiredSchema, fileSchema, idToConstant, deleteFilter);
+ }
+ })
+ .recordsPerBatch(parquetConf.batchSize())
.filter(residual)
.caseSensitive(caseSensitive())
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
@@ -119,7 +131,7 @@ private CloseableIterable newOrcIterable(
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
- .recordsPerBatch(batchSize)
+ .recordsPerBatch(orcConf.batchSize())
.filter(residual)
.caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index f45c152203ee..e96cbaf411ee 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -28,6 +28,8 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.OrcBatchReadConf;
+import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
import org.apache.iceberg.util.SnapshotUtil;
@@ -45,14 +47,18 @@ class BatchDataReader extends BaseBatchReader
private final long numSplits;
- BatchDataReader(SparkInputPartition partition, int batchSize) {
+ BatchDataReader(
+ SparkInputPartition partition,
+ ParquetBatchReadConf parquetBatchReadConf,
+ OrcBatchReadConf orcBatchReadConf) {
this(
partition.table(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
partition.isCaseSensitive(),
- batchSize);
+ parquetBatchReadConf,
+ orcBatchReadConf);
}
BatchDataReader(
@@ -61,8 +67,9 @@ class BatchDataReader extends BaseBatchReader
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
- int size) {
- super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, size);
+ ParquetBatchReadConf parquetConf,
+ OrcBatchReadConf orcConf) {
+ super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, parquetConf, orcConf);
numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name());
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index fd6783f3e1f7..11f054b11710 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -28,8 +28,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
+import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
+import org.apache.iceberg.spark.OrcBatchReadConf;
+import org.apache.iceberg.spark.ParquetBatchReadConf;
+import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
@@ -113,19 +119,31 @@ private String[][] computePreferredLocations() {
@Override
public PartitionReaderFactory createReaderFactory() {
- if (useParquetBatchReads()) {
- int batchSize = readConf.parquetBatchSize();
- return new SparkColumnarReaderFactory(batchSize);
+ if (useCometBatchReads()) {
+ return new SparkColumnarReaderFactory(parquetBatchReadConf(ParquetReaderType.COMET));
+
+ } else if (useParquetBatchReads()) {
+ return new SparkColumnarReaderFactory(parquetBatchReadConf(ParquetReaderType.ICEBERG));
} else if (useOrcBatchReads()) {
- int batchSize = readConf.orcBatchSize();
- return new SparkColumnarReaderFactory(batchSize);
+ return new SparkColumnarReaderFactory(orcBatchReadConf());
} else {
return new SparkRowReaderFactory();
}
}
+ private ParquetBatchReadConf parquetBatchReadConf(ParquetReaderType readerType) {
+ return ImmutableParquetBatchReadConf.builder()
+ .batchSize(readConf.parquetBatchSize())
+ .readerType(readerType)
+ .build();
+ }
+
+ private OrcBatchReadConf orcBatchReadConf() {
+ return ImmutableOrcBatchReadConf.builder().batchSize(readConf.parquetBatchSize()).build();
+ }
+
// conditions for using Parquet batch reads:
// - Parquet vectorization is enabled
// - only primitives or metadata columns are projected
@@ -154,6 +172,17 @@ private boolean supportsParquetBatchReads(Types.NestedField field) {
return field.type().isPrimitiveType() || MetadataColumns.isMetadataColumn(field.fieldId());
}
+ private boolean useCometBatchReads() {
+ return readConf.parquetVectorizationEnabled()
+ && readConf.parquetReaderType() == ParquetReaderType.COMET
+ && expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
+ && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
+ }
+
+ private boolean supportsCometBatchReads(Types.NestedField field) {
+ return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID);
+ }
+
// conditions for using ORC batch reads:
// - ORC vectorization is enabled
// - all tasks are of type FileScanTask and read only ORC files with no delete files
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
index 655e20a50e11..887b84fb617a 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java
@@ -20,6 +20,8 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.OrcBatchReadConf;
+import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -27,11 +29,17 @@
import org.apache.spark.sql.vectorized.ColumnarBatch;
class SparkColumnarReaderFactory implements PartitionReaderFactory {
- private final int batchSize;
+ private final ParquetBatchReadConf parquetConf;
+ private final OrcBatchReadConf orcConf;
- SparkColumnarReaderFactory(int batchSize) {
- Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1");
- this.batchSize = batchSize;
+ SparkColumnarReaderFactory(ParquetBatchReadConf conf) {
+ this.parquetConf = conf;
+ this.orcConf = null;
+ }
+
+ SparkColumnarReaderFactory(OrcBatchReadConf conf) {
+ this.orcConf = conf;
+ this.parquetConf = null;
}
@Override
@@ -49,8 +57,7 @@ public PartitionReader createColumnarReader(InputPartition inputP
SparkInputPartition partition = (SparkInputPartition) inputPartition;
if (partition.allTasksOfType(FileScanTask.class)) {
- return new BatchDataReader(partition, batchSize);
-
+ return new BatchDataReader(partition, parquetConf, orcConf);
} else {
throw new UnsupportedOperationException(
"Unsupported task group for columnar reads: " + partition.taskGroup());
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index d47cf2512916..4f1cef5d373a 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -70,6 +70,9 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
+import org.apache.iceberg.spark.ParquetBatchReadConf;
+import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.spark.data.RandomData;
@@ -664,11 +667,23 @@ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOExcep
TableProperties.SPLIT_LOOKBACK_DEFAULT,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+ ParquetBatchReadConf conf =
+ ImmutableParquetBatchReadConf.builder()
+ .batchSize(7)
+ .readerType(ParquetReaderType.ICEBERG)
+ .build();
+
for (CombinedScanTask task : tasks) {
try (BatchDataReader reader =
new BatchDataReader(
// expected column is id, while the equality filter column is dt
- dateTable, task, dateTable.schema(), dateTable.schema().select("id"), false, 7)) {
+ dateTable,
+ task,
+ dateTable.schema(),
+ dateTable.schema().select("id"),
+ false,
+ conf,
+ null)) {
while (reader.next()) {
ColumnarBatch columnarBatch = reader.get();
int numOfCols = columnarBatch.numCols();