requiredColumns = new ArrayList<>();
+ for (Type col : requestedSchema.asGroupType().getFields()) {
+ requiredColumns.add(col.getName());
+ }
+ this.handle = Native.initRecordBatchReader(filePath, start, length, requiredColumns.toArray());
+ totalRowCount = Native.numRowGroups(handle);
+ isInitialized = true;
+ }
+
+ public void setSparkSchema(StructType schema) {
+ this.sparkSchema = schema;
+ }
+
+ public AbstractColumnReader[] getColumnReaders() {
+ return columnReaders;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ // Do nothing. The initialization work is done in 'init' already.
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ return nextBatch();
+ }
+
+ @Override
+ public Void getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public ColumnarBatch getCurrentValue() {
+ return currentBatch();
+ }
+
+ @Override
+ public float getProgress() {
+ return (float) rowsRead / totalRowCount;
+ }
+
+ /**
+ * Returns the current columnar batch being read.
+ *
+ * Note that this must be called AFTER {@link NativeBatchReader#nextBatch()}.
+ */
+ public ColumnarBatch currentBatch() {
+ return currentBatch;
+ }
+
+ /**
+ * Loads the next batch of rows. This is called by Spark _and_ Iceberg
+ *
+ * @return true if there are no more rows to read, false otherwise.
+ */
+ public boolean nextBatch() throws IOException {
+ Preconditions.checkState(isInitialized, "init() should be called first!");
+
+ if (rowsRead >= totalRowCount) return false;
+ int batchSize;
+
+ try {
+ batchSize = loadNextBatch();
+ } catch (RuntimeException e) {
+ // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
+ throw e;
+ } catch (Throwable e) {
+ throw new IOException(e);
+ }
+
+ if (batchSize == 0) return false;
+
+ long totalDecodeTime = 0, totalLoadTime = 0;
+ for (int i = 0; i < columnReaders.length; i++) {
+ AbstractColumnReader reader = columnReaders[i];
+ long startNs = System.nanoTime();
+ // TODO: read from native reader
+ reader.readBatch(batchSize);
+ // totalDecodeTime += System.nanoTime() - startNs;
+ // startNs = System.nanoTime();
+ vectors[i] = reader.currentBatch();
+ totalLoadTime += System.nanoTime() - startNs;
+ }
+
+ // TODO: (ARROW NATIVE) Add Metrics
+ // SQLMetric decodeMetric = metrics.get("ParquetNativeDecodeTime");
+ // if (decodeMetric != null) {
+ // decodeMetric.add(totalDecodeTime);
+ // }
+ SQLMetric loadMetric = metrics.get("ParquetNativeLoadTime");
+ if (loadMetric != null) {
+ loadMetric.add(totalLoadTime);
+ }
+
+ currentBatch.setNumRows(batchSize);
+ rowsRead += batchSize;
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (columnReaders != null) {
+ for (AbstractColumnReader reader : columnReaders) {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ if (importer != null) {
+ importer.close();
+ importer = null;
+ }
+ Native.closeRecordBatchReader(this.handle);
+ }
+
+ @SuppressWarnings("deprecation")
+ private int loadNextBatch() throws Throwable {
+ long startNs = System.nanoTime();
+
+ int batchSize = Native.readNextRecordBatch(this.handle);
+ if (importer != null) importer.close();
+ importer = new CometSchemaImporter(ALLOCATOR);
+
+ List columns = requestedSchema.getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ // TODO: (ARROW NATIVE) check this. Currently not handling missing columns correctly?
+ if (missingColumns[i]) continue;
+ if (columnReaders[i] != null) columnReaders[i].close();
+ // TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
+ DataType dataType = sparkSchema.fields()[i].dataType();
+ NativeColumnReader reader =
+ new NativeColumnReader(
+ this.handle,
+ i,
+ dataType,
+ columns.get(i),
+ importer,
+ capacity,
+ useDecimal128,
+ useLegacyDateTimestamp);
+ columnReaders[i] = reader;
+ }
+ return batchSize;
+ }
+
+ // Signature of externalAccums changed from returning a Buffer to returning a Seq. If comet is
+ // expecting a Buffer but the Spark version returns a Seq or vice versa, we get a
+ // method not found exception.
+ @SuppressWarnings("unchecked")
+ private Option> getTaskAccumulator(TaskMetrics taskMetrics) {
+ Method externalAccumsMethod;
+ try {
+ externalAccumsMethod = TaskMetrics.class.getDeclaredMethod("externalAccums");
+ externalAccumsMethod.setAccessible(true);
+ String returnType = externalAccumsMethod.getReturnType().getName();
+ if (returnType.equals("scala.collection.mutable.Buffer")) {
+ return ((Buffer>) externalAccumsMethod.invoke(taskMetrics))
+ .lastOption();
+ } else if (returnType.equals("scala.collection.Seq")) {
+ return ((Seq>) externalAccumsMethod.invoke(taskMetrics)).lastOption();
+ } else {
+ return Option.apply(null); // None
+ }
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ return Option.apply(null); // None
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
new file mode 100644
index 0000000000..448ba0fec6
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/NativeColumnReader.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.spark.sql.types.DataType;
+
+import org.apache.comet.vector.*;
+
+// TODO: extend ColumnReader instead of AbstractColumnReader to reduce code duplication
+public class NativeColumnReader extends AbstractColumnReader {
+ protected static final Logger LOG = LoggerFactory.getLogger(NativeColumnReader.class);
+ protected final BufferAllocator ALLOCATOR = new RootAllocator();
+
+ /**
+ * The current Comet vector holding all the values read by this column reader. Owned by this
+ * reader and MUST be closed after use.
+ */
+ private CometDecodedVector currentVector;
+
+ /** Dictionary values for this column. Only set if the column is using dictionary encoding. */
+ protected CometDictionary dictionary;
+
+ /**
+ * The number of values in the current batch, used when we are skipping importing of Arrow
+ * vectors, in which case we'll simply update the null count of the existing vectors.
+ */
+ int currentNumValues;
+
+ /**
+ * Whether the last loaded vector contains any null value. This is used to determine if we can
+ * skip vector reloading. If the flag is false, Arrow C API will skip to import the validity
+ * buffer, and therefore we cannot skip vector reloading.
+ */
+ boolean hadNull;
+
+ private final CometSchemaImporter importer;
+
+ private ArrowArray array = null;
+ private ArrowSchema schema = null;
+
+ private long nativeBatchHandle = 0xDEADBEEFL;
+ private final int columnNum;
+
+ public NativeColumnReader(
+ long nativeBatchHandle,
+ int columnNum,
+ DataType type,
+ ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
+ int batchSize,
+ boolean useDecimal128,
+ boolean useLegacyDateTimestamp) {
+ super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
+ assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
+ this.batchSize = batchSize;
+ this.importer = importer;
+ this.nativeBatchHandle = nativeBatchHandle;
+ this.columnNum = columnNum;
+ initNative();
+ }
+
+ @Override
+ // Override in order to avoid creation of JVM side column readers
+ protected void initNative() {
+ LOG.debug(
+ "Native column reader " + String.join(".", this.descriptor.getPath()) + " is initialized");
+ nativeHandle = 0;
+ }
+
+ @Override
+ public void readBatch(int total) {
+ LOG.debug("Reading column batch of size = " + total);
+
+ this.currentNumValues = total;
+ }
+
+ /** Returns the {@link CometVector} read by this reader. */
+ @Override
+ public CometVector currentBatch() {
+ return loadVector();
+ }
+
+ @Override
+ public void close() {
+ if (currentVector != null) {
+ currentVector.close();
+ currentVector = null;
+ }
+ super.close();
+ }
+
+ /** Returns a decoded {@link CometDecodedVector Comet vector}. */
+ public CometDecodedVector loadVector() {
+
+ LOG.debug("Loading vector for next batch");
+
+ // Close the previous vector first to release struct memory allocated to import Arrow array &
+ // schema from native side, through the C data interface
+ if (currentVector != null) {
+ currentVector.close();
+ }
+
+ LogicalTypeAnnotation logicalTypeAnnotation =
+ descriptor.getPrimitiveType().getLogicalTypeAnnotation();
+ boolean isUuid =
+ logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
+
+ array = ArrowArray.allocateNew(ALLOCATOR);
+ schema = ArrowSchema.allocateNew(ALLOCATOR);
+
+ long arrayAddr = array.memoryAddress();
+ long schemaAddr = schema.memoryAddress();
+
+ Native.currentColumnBatch(nativeBatchHandle, columnNum, arrayAddr, schemaAddr);
+
+ FieldVector vector = importer.importVector(array, schema);
+
+ DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
+
+ CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
+
+ // Update whether the current vector contains any null values. This is used in the following
+ // batch(s) to determine whether we can skip loading the native vector.
+ hadNull = cometVector.hasNull();
+
+ if (dictionaryEncoding == null) {
+ if (dictionary != null) {
+ // This means the column was using dictionary encoding but now has fall-back to plain
+ // encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
+ // a condition to check if we can re-use vector later.
+ dictionary = null;
+ }
+ // Either the column is not dictionary encoded, or it was using dictionary encoding but
+ // a new data page has switched back to use plain encoding. For both cases we should
+ // return plain vector.
+ currentVector = cometVector;
+ return currentVector;
+ }
+
+ // We should already re-initiate `CometDictionary` here because `Data.importVector` API will
+ // release the previous dictionary vector and create a new one.
+ Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
+ CometPlainVector dictionaryVector =
+ new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
+ if (dictionary != null) {
+ dictionary.setDictionaryVector(dictionaryVector);
+ } else {
+ dictionary = new CometDictionary(dictionaryVector);
+ }
+
+ currentVector =
+ new CometDictionaryVector(
+ cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);
+
+ currentVector =
+ new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
+ return currentVector;
+ }
+}
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala
index 09355446c9..275114a11c 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -85,7 +85,17 @@ object CometConf extends ShimCometConf {
"read supported data sources (currently only Parquet is supported natively)." +
" By default, this config is true.")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
+
+ val COMET_NATIVE_ARROW_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
+ "spark.comet.native.arrow.scan.enabled")
+ .internal()
+ .doc(
+ "Whether to enable the fully native arrow based scan. When this is turned on, Spark will " +
+ "use Comet to read Parquet files natively via the Arrow based Parquet reader." +
+ " By default, this config is false.")
+ .booleanConf
+ .createWithDefault(false)
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 455f19929f..ffd3421671 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -23,6 +23,7 @@ pub use mutable_vector::*;
pub mod util;
pub mod read;
+use std::fs::File;
use std::{boxed::Box, ptr::NonNull, sync::Arc};
use crate::errors::{try_unwrap_or_throw, CometError};
@@ -39,10 +40,18 @@ use jni::{
},
};
+use crate::execution::operators::ExecutionError;
use crate::execution::utils::SparkArrowConvert;
use arrow::buffer::{Buffer, MutableBuffer};
-use jni::objects::{JBooleanArray, JLongArray, JPrimitiveArray, ReleaseMode};
+use arrow_array::{Array, RecordBatch};
+use jni::objects::{
+ JBooleanArray, JLongArray, JObjectArray, JPrimitiveArray, JString, ReleaseMode,
+};
+use jni::sys::jstring;
+use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ProjectionMask;
use read::ColumnReader;
+use url::Url;
use util::jni::{convert_column_descriptor, convert_encoding};
use self::util::jni::TypePromotionInfo;
@@ -582,3 +591,212 @@ fn from_u8_slice(src: &mut [u8]) -> &mut [i8] {
let raw_ptr = src.as_mut_ptr() as *mut i8;
unsafe { std::slice::from_raw_parts_mut(raw_ptr, src.len()) }
}
+
+// TODO: (ARROW NATIVE) remove this if not needed.
+enum ParquetReaderState {
+ Init,
+ Reading,
+ Complete,
+}
+/// Parquet read context maintained across multiple JNI calls.
+struct BatchContext {
+ batch_reader: ParquetRecordBatchReader,
+ current_batch: Option,
+ reader_state: ParquetReaderState,
+ num_row_groups: i32,
+ total_rows: i64,
+}
+
+#[inline]
+fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometError> {
+ unsafe {
+ (handle as *mut BatchContext)
+ .as_mut()
+ .ok_or_else(|| CometError::NullPointer("null batch context handle".to_string()))
+ }
+}
+
+#[inline]
+fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReader, CometError> {
+ Ok(&mut get_batch_context(handle)?.batch_reader)
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader(
+ e: JNIEnv,
+ _jclass: JClass,
+ file_path: jstring,
+ start: jlong,
+ length: jlong,
+ required_columns: jobjectArray,
+) -> jlong {
+ try_unwrap_or_throw(&e, |mut env| unsafe {
+ let path: String = env
+ .get_string(&JString::from_raw(file_path))
+ .unwrap()
+ .into();
+ //TODO: (ARROW NATIVE) - this works only for 'file://' urls
+ let path = Url::parse(path.as_ref()).unwrap().to_file_path().unwrap();
+ let file = File::open(path).unwrap();
+
+ // Create a async parquet reader builder with batch_size.
+ // batch_size is the number of rows to read up to buffer once from pages, defaults to 1024
+ // TODO: (ARROW NATIVE) Use async reader ParquetRecordBatchStreamBuilder
+ let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)
+ .unwrap()
+ .with_batch_size(8192); // TODO: (ARROW NATIVE) Use batch size configured in JVM
+
+ //TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized, we need not do this.
+ let metadata = builder.metadata().clone();
+
+ let mut columns_to_read: Vec = Vec::new();
+ let columns_to_read_array = JObjectArray::from_raw(required_columns);
+ let array_len = env.get_array_length(&columns_to_read_array)?;
+ let mut required_columns: Vec = Vec::new();
+ for i in 0..array_len {
+ let p: JString = env
+ .get_object_array_element(&columns_to_read_array, i)?
+ .into();
+ required_columns.push(env.get_string(&p)?.into());
+ }
+ for (i, col) in metadata
+ .file_metadata()
+ .schema_descr()
+ .columns()
+ .iter()
+ .enumerate()
+ {
+ for (_, required) in required_columns.iter().enumerate() {
+ if col.name().to_uppercase().eq(&required.to_uppercase()) {
+ columns_to_read.push(i);
+ break;
+ }
+ }
+ }
+ //TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs)
+ let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read);
+ // Set projection mask to read only root columns 1 and 2.
+ builder = builder.with_projection(mask);
+
+ let mut row_groups_to_read: Vec = Vec::new();
+ let mut total_rows: i64 = 0;
+ // get row groups -
+ for (i, rg) in metadata.row_groups().into_iter().enumerate() {
+ let rg_start = rg.file_offset().unwrap();
+ let rg_end = rg_start + rg.compressed_size();
+ if rg_start >= start && rg_end <= start + length {
+ row_groups_to_read.push(i);
+ total_rows += rg.num_rows();
+ }
+ }
+
+ // Build a sync parquet reader.
+ let batch_reader = builder
+ .with_row_groups(row_groups_to_read.clone())
+ .build()
+ .unwrap();
+
+ let ctx = BatchContext {
+ batch_reader,
+ current_batch: None,
+ reader_state: ParquetReaderState::Init,
+ num_row_groups: row_groups_to_read.len() as i32,
+ total_rows: total_rows,
+ };
+ let res = Box::new(ctx);
+ Ok(Box::into_raw(res) as i64)
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_numRowGroups(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jint {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ // Read data
+ Ok(context.num_row_groups)
+ }) as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_numTotalRows(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jlong {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ // Read data
+ Ok(context.total_rows)
+ }) as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) -> jint {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ let batch_reader = &mut context.batch_reader;
+ // Read data
+ let mut rows_read: i32 = 0;
+ let batch = batch_reader.next();
+
+ match batch {
+ Some(record_batch) => {
+ let batch = record_batch?;
+ rows_read = batch.num_rows() as i32;
+ context.current_batch = Some(batch);
+ context.reader_state = ParquetReaderState::Reading;
+ }
+ None => {
+ context.current_batch = None;
+ context.reader_state = ParquetReaderState::Complete;
+ }
+ }
+ Ok(rows_read)
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_currentColumnBatch(
+ e: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+ column_idx: jint,
+ array_addr: jlong,
+ schema_addr: jlong,
+) {
+ try_unwrap_or_throw(&e, |_env| {
+ let context = get_batch_context(handle)?;
+ let batch_reader = context
+ .current_batch
+ .as_mut()
+ .ok_or_else(|| CometError::Execution {
+ source: ExecutionError::GeneralError("There is no more data to read".to_string()),
+ });
+ let data = batch_reader?.column(column_idx as usize).into_data();
+ data.move_to_spark(array_addr, schema_addr)
+ .map_err(|e| e.into())
+ })
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_comet_parquet_Native_closeRecordBatchReader(
+ env: JNIEnv,
+ _jclass: JClass,
+ handle: jlong,
+) {
+ try_unwrap_or_throw(&env, |_| {
+ unsafe {
+ let ctx = handle as *mut BatchContext;
+ let _ = Box::from_raw(ctx);
+ };
+ Ok(())
+ })
+}
diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 52d8d09a0d..4c96bef4e9 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -100,6 +100,7 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
+ val nativeArrowReaderEnabled = CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.get(sqlConf)
(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
@@ -134,22 +135,42 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
- val batchReader = new BatchReader(
- sharedConf,
- file,
- footer,
- capacity,
- requiredSchema,
- isCaseSensitive,
- useFieldId,
- ignoreMissingIds,
- datetimeRebaseSpec.mode == CORRECTED,
- partitionSchema,
- file.partitionValues,
- JavaConverters.mapAsJavaMap(metrics))
- val iter = new RecordReaderIterator(batchReader)
+ val recordBatchReader =
+ if (nativeArrowReaderEnabled) {
+ val batchReader = new NativeBatchReader(
+ sharedConf,
+ file,
+ footer,
+ capacity,
+ requiredSchema,
+ isCaseSensitive,
+ useFieldId,
+ ignoreMissingIds,
+ datetimeRebaseSpec.mode == CORRECTED,
+ partitionSchema,
+ file.partitionValues,
+ JavaConverters.mapAsJavaMap(metrics))
+ batchReader.init()
+ batchReader
+ } else {
+ val batchReader = new BatchReader(
+ sharedConf,
+ file,
+ footer,
+ capacity,
+ requiredSchema,
+ isCaseSensitive,
+ useFieldId,
+ ignoreMissingIds,
+ datetimeRebaseSpec.mode == CORRECTED,
+ partitionSchema,
+ file.partitionValues,
+ JavaConverters.mapAsJavaMap(metrics))
+ batchReader.init()
+ batchReader
+ }
+ val iter = new RecordReaderIterator(recordBatchReader)
try {
- batchReader.init()
iter.asInstanceOf[Iterator[InternalRow]]
} catch {
case e: Throwable =>
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 35ba069029..99ed5d3cb2 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -79,6 +79,9 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
+ conf.set(CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key, "true")
+ conf.set(CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.key, "false")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
conf